diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/AbstractDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/AbstractDao.java index f7e97e4..87c5e7d 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/AbstractDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/AbstractDao.java @@ -1,5 +1,9 @@ package ru.javaops.masterjava.persist.dao; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.Transaction; +import ru.javaops.masterjava.persist.DBIProvider; + /** * gkislin * 27.10.2016 @@ -8,4 +12,14 @@ */ public interface AbstractDao { void clean(); + + @SqlQuery("SELECT nextval('global_seq')") + int getNextVal(); + + @Transaction + default int getSeqAndSkip(int step) { + int id = getNextVal(); + DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE global_seq RESTART WITH " + (id + step))); + return id; + } } diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/CityDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/CityDao.java new file mode 100644 index 0000000..25b2bc8 --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/CityDao.java @@ -0,0 +1,32 @@ +package ru.javaops.masterjava.persist.dao; + +import one.util.streamex.IntStreamEx; +import org.skife.jdbi.v2.sqlobject.BindBean; +import org.skife.jdbi.v2.sqlobject.SqlBatch; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; +import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; +import ru.javaops.masterjava.persist.model.City; + +import java.util.List; + +/** + * Created by val on 2017-04-06. + */ +public abstract class CityDao implements AbstractDao { + + @SqlBatch("INSERT INTO cities (id, value, value_id) VALUES (:id, :value, :valueId)" + + "ON CONFLICT DO NOTHING") + public abstract int[] insertBatch(@BindBean List cities, @BatchChunkSize int chunkSize); + + public List insertAndGetConflictValueId(List cities) { + int[] result = insertBatch(cities, cities.size()); + return IntStreamEx.range(0, cities.size()) + .filter(i -> result[i] == 0) + .mapToObj(index -> cities.get(index).getValueId()) + .toList(); + } + + @SqlUpdate("TRUNCATE cities") + @Override + public abstract void clean(); +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java new file mode 100644 index 0000000..a2be108 --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java @@ -0,0 +1,40 @@ +package ru.javaops.masterjava.persist.dao; + +import com.bertoncelj.jdbi.entitymapper.EntityMapperFactory; +import one.util.streamex.IntStreamEx; +import org.skife.jdbi.v2.sqlobject.Bind; +import org.skife.jdbi.v2.sqlobject.BindBean; +import org.skife.jdbi.v2.sqlobject.SqlBatch; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; +import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; +import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapperFactory; +import ru.javaops.masterjava.persist.model.Group; + +import java.util.List; + +/** + * Created by val on 2017-04-06. + */ +@RegisterMapperFactory(EntityMapperFactory.class) +public abstract class GroupDao implements AbstractDao{ + + @SqlBatch("INSERT INTO groups (id, name, type, project_id) VALUES (:id, :name, CAST(:type AS group_type), :projectId)" + + "ON CONFLICT DO NOTHING") + public abstract int[] insertBatch(@BindBean List groups, @BatchChunkSize int chunkSize); + + @SqlBatch("INSERT INTO user_group (user_email, group_name) VALUES (:userEmail, :name)" + + "ON CONFLICT DO NOTHING") + public abstract int[] updateGroups(@Bind("userEmail") String email, @Bind("name") List groups); + + public List insertAndGetConflictName(List groups) { + int[] result = insertBatch(groups, groups.size()); + return IntStreamEx.range(0, groups.size()) + .filter(i -> result[i] == 0) + .mapToObj(index -> groups.get(index).getName()) + .toList(); + } + + @SqlUpdate("TRUNCATE projects") + @Override + public abstract void clean(); +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/ProjectDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/ProjectDao.java new file mode 100644 index 0000000..d3cea99 --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/ProjectDao.java @@ -0,0 +1,23 @@ +package ru.javaops.masterjava.persist.dao; + +import com.bertoncelj.jdbi.entitymapper.EntityMapperFactory; +import org.skife.jdbi.v2.sqlobject.BindBean; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; +import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapperFactory; +import ru.javaops.masterjava.persist.model.Project; + +/** + * Created by val on 2017-04-06. + */ +@RegisterMapperFactory(EntityMapperFactory.class) +public abstract class ProjectDao implements AbstractDao{ + + @SqlQuery("INSERT INTO projects (id, description, name) VALUES (:id, :description, :name)" + + "ON CONFLICT ON CONSTRAINT name_unique DO UPDATE SET name=EXCLUDED.name RETURNING id") + public abstract int insertWitId(@BindBean Project project); + + @SqlUpdate("TRUNCATE projects") + @Override + public abstract void clean(); +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java index 1047ebe..787ebeb 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java @@ -5,9 +5,9 @@ import org.skife.jdbi.v2.sqlobject.*; import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapperFactory; -import ru.javaops.masterjava.persist.DBIProvider; import ru.javaops.masterjava.persist.model.User; +import java.util.Collection; import java.util.List; /** @@ -29,43 +29,43 @@ public User insert(User user) { return user; } - @SqlQuery("SELECT nextval('user_seq')") - abstract int getNextVal(); - - @Transaction - public int getSeqAndSkip(int step) { - int id = getNextVal(); - DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE user_seq RESTART WITH " + (id + step))); - return id; - } - - @SqlUpdate("INSERT INTO users (full_name, email, flag) VALUES (:fullName, :email, CAST(:flag AS USER_FLAG)) ") + @SqlUpdate("INSERT INTO users (full_name, email, flag, city) VALUES (:fullName, :email, CAST(:flag AS USER_FLAG), :city) ") @GetGeneratedKeys abstract int insertGeneratedId(@BindBean User user); - @SqlUpdate("INSERT INTO users (id, full_name, email, flag) VALUES (:id, :fullName, :email, CAST(:flag AS USER_FLAG)) ") + @SqlUpdate("INSERT INTO users (id, full_name, email, flag, city) VALUES (:id, :fullName, :email, CAST(:flag AS USER_FLAG), :city) ") abstract void insertWitId(@BindBean User user); @SqlQuery("SELECT * FROM users ORDER BY full_name, email LIMIT :it") public abstract List getWithLimit(@Bind int limit); // http://stackoverflow.com/questions/13223820/postgresql-delete-all-content - @SqlUpdate("TRUNCATE users") + @SqlUpdate("TRUNCATE users CASCADE") @Override public abstract void clean(); // https://habrahabr.ru/post/264281/ - @SqlBatch("INSERT INTO users (id, full_name, email, flag) VALUES (:id, :fullName, :email, CAST(:flag AS USER_FLAG))" + + @SqlBatch("INSERT INTO users (id, full_name, email, flag, city) VALUES (:id, :fullName, :email, CAST(:flag AS USER_FLAG), :city)" + "ON CONFLICT DO NOTHING") // "ON CONFLICT (email) DO UPDATE SET full_name=:fullName, flag=CAST(:flag AS USER_FLAG)") - public abstract int[] insertBatch(@BindBean List users, @BatchChunkSize int chunkSize); + public abstract int[] insertBatch(@BindBean Collection users, @BatchChunkSize int chunkSize); - public List insertAndGetConflictEmails(List users) { + @SqlBatch("INSERT INTO users (id, full_name, email, flag, city) " + + "SELECT :id, :fullName, :email, CAST(:flag AS USER_FLAG), :city " + + "WHERE (SELECT value_id FROM cities WHERE value_id = :city) IS NOT NULL " + + "ON CONFLICT DO NOTHING") + public abstract int[] insertBatchWithCity(@BindBean Collection users, @BatchChunkSize int chunkSize); + + + public List insertAndGetConflicts(List users) { int[] result = insertBatch(users, users.size()); return IntStreamEx.range(0, users.size()) .filter(i -> result[i] == 0) - .mapToObj(index -> users.get(index).getEmail()) + .mapToObj(users::get) .toList(); } + + + } diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/City.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/City.java new file mode 100644 index 0000000..f92a7dc --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/City.java @@ -0,0 +1,23 @@ +package ru.javaops.masterjava.persist.model; + +import com.bertoncelj.jdbi.entitymapper.Column; +import lombok.*; + +/** + * Created by val on 2017-04-05. + */ +@Data +@RequiredArgsConstructor +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +public class City extends BaseEntity{ + + private @NonNull String value; + @Column("value_id") + private @NonNull String valueId; + + public City(Integer id, String value, String valueId) { + this(value, valueId); + this.id = id; + } +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/Group.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/Group.java new file mode 100644 index 0000000..a1c30c0 --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/Group.java @@ -0,0 +1,24 @@ +package ru.javaops.masterjava.persist.model; + +import com.bertoncelj.jdbi.entitymapper.Column; +import lombok.*; + +/** + * Created by val on 2017-04-05. + */ +@Data +@RequiredArgsConstructor +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +public class Group extends BaseEntity { + + private @NonNull String name; + private @NonNull GroupType type; + @Column("project_id") + private @NonNull Integer projectId; + + public Group(Integer id, String name, GroupType type, Integer projectId) { + this(name, type, projectId); + this.id = id; + } +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/GroupType.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/GroupType.java new file mode 100644 index 0000000..ed0512f --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/GroupType.java @@ -0,0 +1,10 @@ +package ru.javaops.masterjava.persist.model; + +/** + * Created by val on 2017-04-05. + */ +public enum GroupType { + REGISTERING, + CURRENT, + FINISHED +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/Project.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/Project.java new file mode 100644 index 0000000..123ef64 --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/Project.java @@ -0,0 +1,24 @@ +package ru.javaops.masterjava.persist.model; + +import lombok.*; + +import java.util.List; + +/** + * Created by val on 2017-04-05. + */ +@Data +@RequiredArgsConstructor +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +public class Project extends BaseEntity{ + + private @NonNull String description; + private @NonNull String name; + private List groups; + + public Project(Integer id, String description, String name) { + this(description, name); + this.id = id; + } +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java index 927a299..144c8db 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java @@ -3,6 +3,8 @@ import com.bertoncelj.jdbi.entitymapper.Column; import lombok.*; +import java.util.List; + @Data @RequiredArgsConstructor @EqualsAndHashCode(callSuper = true) @@ -12,9 +14,11 @@ public class User extends BaseEntity { private @NonNull String fullName; private @NonNull String email; private @NonNull UserFlag flag; + private @NonNull String city; + private List groups; - public User(Integer id, String fullName, String email, UserFlag flag) { - this(fullName, email, flag); + public User(Integer id, String fullName, String email, UserFlag flag, String city) { + this(fullName, email, flag, city); this.id=id; } } \ No newline at end of file diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/UserFlag.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/UserFlag.java index bc2f691..31ecdf4 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/model/UserFlag.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/UserFlag.java @@ -7,5 +7,5 @@ public enum UserFlag { active, deleted, - superuser; + superuser } diff --git a/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java b/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java index bc80bba..4a3f671 100644 --- a/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java +++ b/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java @@ -21,12 +21,12 @@ public class UserTestData { public static List FIST5_USERS; public static void init() { - ADMIN = new User("Admin", "admin@javaops.ru", UserFlag.superuser); - DELETED = new User("Deleted", "deleted@yandex.ru", UserFlag.deleted); - FULL_NAME = new User("Full Name", "gmail@gmail.com", UserFlag.active); - USER1 = new User("User1", "user1@gmail.com", UserFlag.active); - USER2 = new User("User2", "user2@yandex.ru", UserFlag.active); - USER3 = new User("User3", "user3@yandex.ru", UserFlag.active); + ADMIN = new User(100001,"Admin", "admin@javaops.ru", UserFlag.superuser, "msk"); + DELETED = new User(100002,"Deleted", "deleted@yandex.ru", UserFlag.deleted, "kiv"); + FULL_NAME = new User(100003,"Full Name", "gmail@gmail.com", UserFlag.active, "msk"); + USER1 = new User(100004, "User1", "user1@gmail.com", UserFlag.active, "kiv"); + USER2 = new User(100005, "User2", "user2@yandex.ru", UserFlag.active, "spb"); + USER3 = new User(100006, "User3", "user3@yandex.ru", UserFlag.active, "mnk"); FIST5_USERS = ImmutableList.of(ADMIN, DELETED, FULL_NAME, USER1, USER2); } diff --git a/persist/src/test/java/ru/javaops/masterjava/persist/dao/GroupDaoTest.java b/persist/src/test/java/ru/javaops/masterjava/persist/dao/GroupDaoTest.java new file mode 100644 index 0000000..547300f --- /dev/null +++ b/persist/src/test/java/ru/javaops/masterjava/persist/dao/GroupDaoTest.java @@ -0,0 +1,12 @@ +package ru.javaops.masterjava.persist.dao; + +/** + * Created by val on 2017-04-06. + */ +public class GroupDaoTest extends AbstractDaoTest { + + public GroupDaoTest() { + super(GroupDao.class); + } + +} \ No newline at end of file diff --git a/persist/src/test/java/ru/javaops/masterjava/persist/dao/ProjectDaoTest.java b/persist/src/test/java/ru/javaops/masterjava/persist/dao/ProjectDaoTest.java new file mode 100644 index 0000000..2cc020a --- /dev/null +++ b/persist/src/test/java/ru/javaops/masterjava/persist/dao/ProjectDaoTest.java @@ -0,0 +1,22 @@ +package ru.javaops.masterjava.persist.dao; + +import org.junit.Test; +import ru.javaops.masterjava.persist.model.Project; + +/** + * Created by val on 2017-04-06. + */ +public class ProjectDaoTest extends AbstractDaoTest{ + + public ProjectDaoTest() { + super(ProjectDao.class); + } + + @Test + public void insertWitId() throws Exception { + Project project = new Project(100201, "Topjava", "topjava"); + System.out.println(dao.insertWitId(project)); + + } + +} \ No newline at end of file diff --git a/persist/src/test/java/ru/javaops/masterjava/persist/dao/UserDaoTest.java b/persist/src/test/java/ru/javaops/masterjava/persist/dao/UserDaoTest.java index 1dd6ed4..aa9c5f0 100644 --- a/persist/src/test/java/ru/javaops/masterjava/persist/dao/UserDaoTest.java +++ b/persist/src/test/java/ru/javaops/masterjava/persist/dao/UserDaoTest.java @@ -1,13 +1,9 @@ package ru.javaops.masterjava.persist.dao; import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import ru.javaops.masterjava.persist.UserTestData; -import ru.javaops.masterjava.persist.model.User; - -import java.util.List; import static ru.javaops.masterjava.persist.UserTestData.FIST5_USERS; @@ -26,7 +22,7 @@ public static void init() throws Exception { UserTestData.init(); } - @Before + /*@Before public void setUp() throws Exception { UserTestData.setUp(); } @@ -42,7 +38,7 @@ public void insertBatch() throws Exception { dao.clean(); dao.insertBatch(FIST5_USERS, 3); Assert.assertEquals(5, dao.getWithLimit(100).size()); - } + }*/ @Test public void getSeqAndSkip() throws Exception { @@ -50,4 +46,13 @@ public void getSeqAndSkip() throws Exception { int seq2 = dao.getSeqAndSkip(1); Assert.assertEquals(5, seq2 - seq1); } + + @Test + public void insertBatchWithCity() throws Exception { + for (int x: dao.insertBatchWithCity(FIST5_USERS, 3)) { + System.out.println(x); + } + + Assert.assertEquals(3, dao.getWithLimit(100).size()); + } } \ No newline at end of file diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java new file mode 100644 index 0000000..9db386b --- /dev/null +++ b/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java @@ -0,0 +1,72 @@ +package ru.javaops.masterjava.export; + +import lombok.extern.slf4j.Slf4j; +import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.dao.CityDao; +import ru.javaops.masterjava.persist.model.City; +import ru.javaops.masterjava.xml.util.StaxStreamProcessor; + +import javax.xml.stream.events.XMLEvent; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** + * Created by val on 2017-04-06. + */ +@Slf4j +public class CityExport { + private final CityDao cityDao = DBIProvider.getDao(CityDao.class); + + public List process(final ExecutorService executorService, + final StaxStreamProcessor processor, + int chunkSize) throws Exception { + + return new Callable>() { + + @Override + public List call() throws Exception { + + List futures = new ArrayList<>(); + int id = cityDao.getSeqAndSkip(chunkSize); + List chunk = new ArrayList<>(chunkSize); + + // Cities loop + String tag; + while ((tag = processor.doUntilAny(XMLEvent.START_ELEMENT, "City", "Users")) != null) { + + if (tag.equals("Users")) break; + + final String valueId = processor.getAttribute("id"); + final String value = processor.getText(); + + final City city = new City(id++, value, valueId); + chunk.add(city); + + if (chunk.size() == chunkSize) { + futures.addAll(submit(chunk)); + chunk = new ArrayList<>(chunkSize); + id = cityDao.getSeqAndSkip(chunkSize); + } + } + if (!chunk.isEmpty()) + futures.addAll(submit(chunk)); + return futures; + } + + private List submit(List cities) throws Exception { + Future> chunkFuture = executorService.submit(() -> { + List stringsResult = new ArrayList<>(); + cityDao.insertAndGetConflictValueId(cities) + .forEach(s -> stringsResult.add("city with id " + s + " already present")); + return stringsResult; + }); + log.info("Submit " + cities.size() + " cities"); + //return List with fail messages + return chunkFuture.get(); + } + }.call(); + } +} diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/PayloadExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/PayloadExport.java new file mode 100644 index 0000000..03a86a0 --- /dev/null +++ b/web/export/src/main/java/ru/javaops/masterjava/export/PayloadExport.java @@ -0,0 +1,42 @@ +package ru.javaops.masterjava.export; + +import lombok.extern.slf4j.Slf4j; +import ru.javaops.masterjava.xml.util.StaxStreamProcessor; + +import javax.xml.stream.XMLStreamException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Created by val on 2017-04-06. + */ +@Slf4j +public class PayloadExport { + + private static final int NUMBER_THREADS = 4; + private final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_THREADS); + + private final ProjectGroupExport projectGroupExport = new ProjectGroupExport(); + private final CityExport cityExport = new CityExport(); + private final UserExport userExport = new UserExport(); + + public List process (final InputStream is, int chunkSize) throws XMLStreamException { + log.info("Start processing with chunkSize=" + chunkSize); + + final StaxStreamProcessor processor = new StaxStreamProcessor(is); + List result = new ArrayList<>(); + + try { + result.addAll(projectGroupExport.process(executorService, processor, chunkSize)); + result.addAll(cityExport.process(executorService, processor, chunkSize)); + result.addAll(userExport.process(executorService, processor, chunkSize)); + } catch (Exception e) { + e.printStackTrace(); + } + + return result; + } +} diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/ProjectGroupExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/ProjectGroupExport.java new file mode 100644 index 0000000..ffb7073 --- /dev/null +++ b/web/export/src/main/java/ru/javaops/masterjava/export/ProjectGroupExport.java @@ -0,0 +1,103 @@ +package ru.javaops.masterjava.export; + +import lombok.extern.slf4j.Slf4j; +import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.dao.GroupDao; +import ru.javaops.masterjava.persist.dao.ProjectDao; +import ru.javaops.masterjava.persist.model.Group; +import ru.javaops.masterjava.persist.model.GroupType; +import ru.javaops.masterjava.persist.model.Project; +import ru.javaops.masterjava.xml.util.StaxStreamProcessor; + +import javax.xml.stream.events.XMLEvent; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +/** + * Created by val on 2017-04-06. + */ +@Slf4j +public class ProjectGroupExport { + private final GroupDao groupDao = DBIProvider.getDao(GroupDao.class); + private final ProjectDao projectDao = DBIProvider.getDao(ProjectDao.class); + + public List process(final ExecutorService executorService, + final StaxStreamProcessor processor, + int chunkSize) throws Exception { + + return new Callable>() { + + @Override + public List call() throws Exception { + + List futures = new ArrayList<>(); + int id = projectDao.getSeqAndSkip(chunkSize); + + // Projects loop + String tag = processor.doUntilAny(XMLEvent.START_ELEMENT, "Project", "Cities"); + while (tag != null) { + + if (tag.equals("Cities")) break; + + final String name = processor.getAttribute("name"); + processor.doUntil(XMLEvent.START_ELEMENT, "description"); + final String description = processor.getReader().getElementText(); + + final Project project = new Project(id++, description, name); + + // Groups loop + List groups = new ArrayList<>(); + while ((tag = processor.doUntilAny(XMLEvent.START_ELEMENT, "Project", "Group", "Cities")) != null) { + + if (tag.equals("Cities") || tag.equals("Project")) break; + + final String groupName = processor.getAttribute("name"); + final GroupType type = GroupType.valueOf(processor.getAttribute("type")); + + groups.add(new Group(id++, groupName, type, project.getId())); + + if (groups.size() == chunkSize - 1) { + id = projectDao.getSeqAndSkip(chunkSize); + } + } + project.setGroups(groups); + + futures.addAll(submit(project)); + } + return futures; + } + + private List submit(Project project) throws Exception { + Future> chunkFuture = executorService.submit(() -> { + List stringsResult = new ArrayList<>(); + + //add project to DB + //insertWitId on conflict return id for row already present in DB + int resultId = projectDao.insertWitId(project); + if (resultId != project.getId()) { + stringsResult.add("project with name " + project.getName() + " already present"); + //change projectId for project & groups to actual + project.setId(resultId); + project.getGroups().forEach(g -> g.setProjectId(resultId)); + } + + //add groups to DB by chunk + List conflictNames = new ArrayList<>(); + int groupSize = project.getGroups().size(); + for (int i = 0; i < groupSize / chunkSize + 1; i++) { + int start = i * chunkSize; + int end = (i + 1) * chunkSize > groupSize ? groupSize : (i + 1) * chunkSize; + List groupsChunk = project.getGroups().subList(start, end); + conflictNames.addAll(groupDao.insertAndGetConflictName(groupsChunk)); + } + conflictNames.forEach(s -> stringsResult.add(" - group with name " + s + " for project " + project.getName() + " already present")); + return stringsResult; + }); + log.info("Submit project " + project.getName() + " with " + project.getGroups().size() + " groups"); + //return List with fail messages + return chunkFuture.get(); + } + }.call(); + } +} diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java b/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java index 9a8b98d..8a67c21 100644 --- a/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java +++ b/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java @@ -22,8 +22,7 @@ @Slf4j public class UploadServlet extends HttpServlet { private static final int CHUNK_SIZE = 2000; - - private final UserExport userExport = new UserExport(); + private final PayloadExport payloadExport = new PayloadExport(); @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { @@ -42,8 +41,8 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S } else { Part filePart = req.getPart("fileToUpload"); try (InputStream is = filePart.getInputStream()) { - List failed = userExport.process(is, chunkSize); - log.info("Failed users: " + failed); + List failed = payloadExport.process(is, chunkSize); + log.info("Failed uploads: " + failed); final WebContext webContext = new WebContext(req, resp, req.getServletContext(), req.getLocale(), ImmutableMap.of("failed", failed)); diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java index 0a2188b..d1bafdf 100644 --- a/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java +++ b/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java @@ -1,22 +1,18 @@ package ru.javaops.masterjava.export; -import lombok.Value; +import com.google.common.base.Splitter; import lombok.extern.slf4j.Slf4j; -import one.util.streamex.StreamEx; import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.dao.GroupDao; import ru.javaops.masterjava.persist.dao.UserDao; import ru.javaops.masterjava.persist.model.User; import ru.javaops.masterjava.persist.model.UserFlag; import ru.javaops.masterjava.xml.util.StaxStreamProcessor; -import javax.xml.stream.XMLStreamException; import javax.xml.stream.events.XMLEvent; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; /** @@ -25,83 +21,61 @@ */ @Slf4j public class UserExport { - - private static final int NUMBER_THREADS = 4; - private final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_THREADS); private final UserDao userDao = DBIProvider.getDao(UserDao.class); + private final GroupDao groupDao = DBIProvider.getDao(GroupDao.class); - @Value - public static class FailedEmail { - public String emailOrRange; - public String reason; - - @Override - public String toString() { - return emailOrRange + " : " + reason; - } - } - - public List process(final InputStream is, int chunkSize) throws XMLStreamException { - log.info("Start proseccing with chunkSize=" + chunkSize); + public List process(final ExecutorService executorService, + final StaxStreamProcessor processor, + int chunkSize) throws Exception { - return new Callable>() { - class ChunkFuture { - String emailRange; - Future> future; - - public ChunkFuture(List chunk, Future> future) { - this.future = future; - this.emailRange = chunk.get(0).getEmail(); - if (chunk.size() > 1) { - this.emailRange += '-' + chunk.get(chunk.size() - 1).getEmail(); - } - } - } + return new Callable>() { @Override - public List call() throws XMLStreamException { - List futures = new ArrayList<>(); + public List call() throws Exception { + List futures = new ArrayList<>(); int id = userDao.getSeqAndSkip(chunkSize); - List chunk = new ArrayList<>(chunkSize); - final StaxStreamProcessor processor = new StaxStreamProcessor(is); + Map> chunk = new HashMap<>(chunkSize); while (processor.doUntil(XMLEvent.START_ELEMENT, "User")) { final String email = processor.getAttribute("email"); + final String city = processor.getAttribute("city"); + String groupRefs = processor.getAttribute("groupRefs"); final UserFlag flag = UserFlag.valueOf(processor.getAttribute("flag")); - final String fullName = processor.getReader().getElementText(); - final User user = new User(id++, fullName, email, flag); - chunk.add(user); + final String fullName = processor.getText(); + final User user = new User(id++, fullName, email, flag, city); + chunk.put(user, Splitter.on(' ').splitToList(groupRefs)); if (chunk.size() == chunkSize) { - futures.add(submit(chunk)); - chunk = new ArrayList<>(chunkSize); + futures.addAll(submit(chunk)); + chunk = new HashMap<>(chunkSize); id = userDao.getSeqAndSkip(chunkSize); } } if (!chunk.isEmpty()) { - futures.add(submit(chunk)); + futures.addAll(submit(chunk)); } - - List failed = new ArrayList<>(); - futures.forEach(cf -> { - try { - failed.addAll(StreamEx.of(cf.future.get()).map(email -> new FailedEmail(email, "already present")).toList()); - log.info(cf.emailRange + " successfully executed"); - } catch (Exception e) { - log.error(cf.emailRange + " failed", e); - failed.add(new FailedEmail(cf.emailRange, e.toString())); - } - }); - return failed; + return futures; } - private ChunkFuture submit(List chunk) { - ChunkFuture chunkFuture = new ChunkFuture(chunk, - executorService.submit(() -> userDao.insertAndGetConflictEmails(chunk)) - ); - log.info("Submit " + chunkFuture.emailRange); - return chunkFuture; + private List submit(Map> chunk) throws Exception { + + Future> chunkFuture = executorService.submit( + () -> { + List failed = new ArrayList<>(); + List users = userDao.insertAndGetConflicts(new ArrayList<>(chunk.keySet())); + users.forEach(u -> { + failed.add("User with email = " + u.getEmail() + " already present"); + chunk.remove(u); + }); + // don't need failed messages for groups because "DELETE ON CASCADE" + chunk.forEach((User u, List groups) -> groupDao.updateGroups(u.getEmail(), groups)); + return failed; + }); + + log.info("Submit " + chunk.size() + " users"); + + return chunkFuture.get(); } }.call(); }