From ed16c08b48027573bae63cd482f0400b4ce555fd Mon Sep 17 00:00:00 2001 From: Tanechka Date: Thu, 6 Apr 2017 00:04:34 -0700 Subject: [PATCH 1/5] add cities, groups, projects to DB & Model --- .../masterjava/persist/model/City.java | 23 ++++++++++++++++++ .../masterjava/persist/model/Group.java | 24 +++++++++++++++++++ .../masterjava/persist/model/GroupType.java | 10 ++++++++ .../masterjava/persist/model/Project.java | 21 ++++++++++++++++ .../masterjava/persist/model/User.java | 5 ++-- .../masterjava/persist/model/UserFlag.java | 2 +- 6 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 persist/src/main/java/ru/javaops/masterjava/persist/model/City.java create mode 100644 persist/src/main/java/ru/javaops/masterjava/persist/model/Group.java create mode 100644 persist/src/main/java/ru/javaops/masterjava/persist/model/GroupType.java create mode 100644 persist/src/main/java/ru/javaops/masterjava/persist/model/Project.java diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/City.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/City.java new file mode 100644 index 0000000..f92a7dc --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/City.java @@ -0,0 +1,23 @@ +package ru.javaops.masterjava.persist.model; + +import com.bertoncelj.jdbi.entitymapper.Column; +import lombok.*; + +/** + * Created by val on 2017-04-05. + */ +@Data +@RequiredArgsConstructor +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +public class City extends BaseEntity{ + + private @NonNull String value; + @Column("value_id") + private @NonNull String valueId; + + public City(Integer id, String value, String valueId) { + this(value, valueId); + this.id = id; + } +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/Group.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/Group.java new file mode 100644 index 0000000..a1c30c0 --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/Group.java @@ -0,0 +1,24 @@ +package ru.javaops.masterjava.persist.model; + +import com.bertoncelj.jdbi.entitymapper.Column; +import lombok.*; + +/** + * Created by val on 2017-04-05. + */ +@Data +@RequiredArgsConstructor +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +public class Group extends BaseEntity { + + private @NonNull String name; + private @NonNull GroupType type; + @Column("project_id") + private @NonNull Integer projectId; + + public Group(Integer id, String name, GroupType type, Integer projectId) { + this(name, type, projectId); + this.id = id; + } +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/GroupType.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/GroupType.java new file mode 100644 index 0000000..ed0512f --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/GroupType.java @@ -0,0 +1,10 @@ +package ru.javaops.masterjava.persist.model; + +/** + * Created by val on 2017-04-05. + */ +public enum GroupType { + REGISTERING, + CURRENT, + FINISHED +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/Project.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/Project.java new file mode 100644 index 0000000..edf0130 --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/Project.java @@ -0,0 +1,21 @@ +package ru.javaops.masterjava.persist.model; + +import lombok.*; + +/** + * Created by val on 2017-04-05. + */ +@Data +@RequiredArgsConstructor +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +public class Project extends BaseEntity{ + + private @NonNull String description; + private @NonNull String name; + + public Project(Integer id, String description, String name) { + this(description, name); + this.id = id; + } +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java index 927a299..744a1e5 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java @@ -12,9 +12,10 @@ public class User extends BaseEntity { private @NonNull String fullName; private @NonNull String email; private @NonNull UserFlag flag; + private @NonNull String city; - public User(Integer id, String fullName, String email, UserFlag flag) { - this(fullName, email, flag); + public User(Integer id, String fullName, String email, UserFlag flag, String city) { + this(fullName, email, flag, city); this.id=id; } } \ No newline at end of file diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/UserFlag.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/UserFlag.java index bc2f691..31ecdf4 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/model/UserFlag.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/UserFlag.java @@ -7,5 +7,5 @@ public enum UserFlag { active, deleted, - superuser; + superuser } From 1ca48aa3a3b682eddf8a3dac2142c53ae01e97e9 Mon Sep 17 00:00:00 2001 From: Tanechka Date: Thu, 6 Apr 2017 21:06:31 -0700 Subject: [PATCH 2/5] Base HW --- .../masterjava/persist/dao/CityDao.java | 58 ++++++++++ .../masterjava/persist/dao/GroupDao.java | 61 ++++++++++ .../masterjava/persist/dao/ProjectDao.java | 62 +++++++++++ .../masterjava/persist/dao/UserDao.java | 11 +- .../masterjava/persist/model/Project.java | 3 + .../masterjava/persist/model/User.java | 3 + .../masterjava/persist/UserTestData.java | 12 +- .../persist/dao/ProjectDaoTest.java | 29 +++++ .../javaops/masterjava/export/CityExport.java | 74 +++++++++++++ .../masterjava/export/PayloadExport.java | 37 +++++++ .../masterjava/export/ProjectGroupExport.java | 104 ++++++++++++++++++ .../masterjava/export/UploadServlet.java | 10 +- .../javaops/masterjava/export/UserExport.java | 8 +- 13 files changed, 455 insertions(+), 17 deletions(-) create mode 100644 persist/src/main/java/ru/javaops/masterjava/persist/dao/CityDao.java create mode 100644 persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java create mode 100644 persist/src/main/java/ru/javaops/masterjava/persist/dao/ProjectDao.java create mode 100644 persist/src/test/java/ru/javaops/masterjava/persist/dao/ProjectDaoTest.java create mode 100644 web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java create mode 100644 web/export/src/main/java/ru/javaops/masterjava/export/PayloadExport.java create mode 100644 web/export/src/main/java/ru/javaops/masterjava/export/ProjectGroupExport.java diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/CityDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/CityDao.java new file mode 100644 index 0000000..d77ec63 --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/CityDao.java @@ -0,0 +1,58 @@ +package ru.javaops.masterjava.persist.dao; + +import one.util.streamex.IntStreamEx; +import org.skife.jdbi.v2.sqlobject.*; +import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; +import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.model.City; + +import java.util.List; + +/** + * Created by val on 2017-04-06. + */ +public abstract class CityDao implements AbstractDao { + + public City insert(City city) { + if (city.isNew()) { + int id = insertGeneratedId(city); + city.setId(id); + } else { + insertWitId(city); + } + return city; + } + + @SqlQuery("SELECT nextval('global_seq')") + abstract int getNextVal(); + + @Transaction + public int getSeqAndSkip(int step) { + int id = getNextVal(); + DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE global_seq RESTART WITH " + (id + step))); + return id; + } + + @SqlUpdate("INSERT INTO cities (value, value_id) VALUES (:value, :valueId)") + @GetGeneratedKeys + abstract int insertGeneratedId(@BindBean City city); + + @SqlUpdate("INSERT INTO cities (id, value, value_id) VALUES (:id, :value, :valueId)") + abstract void insertWitId(@BindBean City city); + + @SqlBatch("INSERT INTO cities (id, value, value_id) VALUES (:id, :value, :valueId)" + + "ON CONFLICT DO NOTHING") + public abstract int[] insertBatch(@BindBean List cities, @BatchChunkSize int chunkSize); + + public List insertAndGetConflictValueId(List cities) { + int[] result = insertBatch(cities, cities.size()); + return IntStreamEx.range(0, cities.size()) + .filter(i -> result[i] == 0) + .mapToObj(index -> cities.get(index).getValueId()) + .toList(); + } + + @SqlUpdate("TRUNCATE cities") + @Override + public abstract void clean(); +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java new file mode 100644 index 0000000..84d21f5 --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java @@ -0,0 +1,61 @@ +package ru.javaops.masterjava.persist.dao; + +import com.bertoncelj.jdbi.entitymapper.EntityMapperFactory; +import one.util.streamex.IntStreamEx; +import org.skife.jdbi.v2.sqlobject.*; +import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; +import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapperFactory; +import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.model.Group; + +import java.util.List; + +/** + * Created by val on 2017-04-06. + */ +@RegisterMapperFactory(EntityMapperFactory.class) +public abstract class GroupDao implements AbstractDao{ + + public Group insert(Group group) { + if (group.isNew()) { + int id = insertGeneratedId(group); + group.setId(id); + } else { + insertWitId(group); + } + return group; + } + + @SqlQuery("SELECT nextval('global_seq')") + abstract int getNextVal(); + + @Transaction + public int getSeqAndSkip(int step) { + int id = getNextVal(); + DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE global_seq RESTART WITH " + (id + step))); + return id; + } + + @SqlUpdate("INSERT INTO groups (name, type, project_id) VALUES (:name, CAST(:type AS group_type), :projectId)") + @GetGeneratedKeys + abstract int insertGeneratedId(@BindBean Group group); + + @SqlUpdate("INSERT INTO groups (id, name, type, project_id) VALUES (:id, :name, CAST(:type AS group_type), :projectId)") + abstract void insertWitId(@BindBean Group group); + + @SqlBatch("INSERT INTO groups (id, name, type, project_id) VALUES (:id, :name, CAST(:type AS group_type), :projectId)" + + "ON CONFLICT DO NOTHING") + public abstract int[] insertBatch(@BindBean List groups, @BatchChunkSize int chunkSize); + + public List insertAndGetConflictName(List groups) { + int[] result = insertBatch(groups, groups.size()); + return IntStreamEx.range(0, groups.size()) + .filter(i -> result[i] == 0) + .mapToObj(index -> groups.get(index).getName()) + .toList(); + } + + @SqlUpdate("TRUNCATE projects") + @Override + public abstract void clean(); +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/ProjectDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/ProjectDao.java new file mode 100644 index 0000000..86668f2 --- /dev/null +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/ProjectDao.java @@ -0,0 +1,62 @@ +package ru.javaops.masterjava.persist.dao; + +import com.bertoncelj.jdbi.entitymapper.EntityMapperFactory; +import one.util.streamex.IntStreamEx; +import org.skife.jdbi.v2.sqlobject.*; +import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; +import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapperFactory; +import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.model.Project; + +import java.util.List; + +/** + * Created by val on 2017-04-06. + */ +@RegisterMapperFactory(EntityMapperFactory.class) +public abstract class ProjectDao implements AbstractDao{ + + public Project insert(Project project) { + if (project.isNew()) { + int id = insertGeneratedId(project); + project.setId(id); + } else { + insertWitId(project); + } + return project; + } + + @SqlQuery("SELECT nextval('global_seq')") + abstract int getNextVal(); + + @Transaction + public int getSeqAndSkip(int step) { + int id = getNextVal(); + DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE global_seq RESTART WITH " + (id + step))); + return id; + } + + @SqlUpdate("INSERT INTO projects (description, name) VALUES (:description, :name)") + @GetGeneratedKeys + abstract int insertGeneratedId(@BindBean Project project); + + @SqlQuery("INSERT INTO projects (id, description, name) VALUES (:id, :description, :name)" + + "ON CONFLICT ON CONSTRAINT name_unique DO UPDATE SET name=EXCLUDED.name RETURNING id") + public abstract int insertWitId(@BindBean Project project); + + @SqlBatch("INSERT INTO projects (id, description, name) VALUES (:id, :description, :name)" + + "ON CONFLICT DO NOTHING") + public abstract int[] insertBatch(@BindBean List projects, @BatchChunkSize int chunkSize); + + public List insertAndGetConflictName(List projects) { + int[] result = insertBatch(projects, projects.size()); + return IntStreamEx.range(0, projects.size()) + .filter(i -> result[i] == 0) + .mapToObj(index -> projects.get(index).getName()) + .toList(); + } + + @SqlUpdate("TRUNCATE projects") + @Override + public abstract void clean(); +} diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java index 1047ebe..fe65321 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java @@ -16,6 +16,7 @@ *

*

*/ +//TODO update group_user table @RegisterMapperFactory(EntityMapperFactory.class) public abstract class UserDao implements AbstractDao { @@ -29,21 +30,21 @@ public User insert(User user) { return user; } - @SqlQuery("SELECT nextval('user_seq')") + @SqlQuery("SELECT nextval('global_seq')") abstract int getNextVal(); @Transaction public int getSeqAndSkip(int step) { int id = getNextVal(); - DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE user_seq RESTART WITH " + (id + step))); + DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE global_seq RESTART WITH " + (id + step))); return id; } - @SqlUpdate("INSERT INTO users (full_name, email, flag) VALUES (:fullName, :email, CAST(:flag AS USER_FLAG)) ") + @SqlUpdate("INSERT INTO users (full_name, email, flag, city) VALUES (:fullName, :email, CAST(:flag AS USER_FLAG), :city) ") @GetGeneratedKeys abstract int insertGeneratedId(@BindBean User user); - @SqlUpdate("INSERT INTO users (id, full_name, email, flag) VALUES (:id, :fullName, :email, CAST(:flag AS USER_FLAG)) ") + @SqlUpdate("INSERT INTO users (id, full_name, email, flag, city) VALUES (:id, :fullName, :email, CAST(:flag AS USER_FLAG), :city) ") abstract void insertWitId(@BindBean User user); @SqlQuery("SELECT * FROM users ORDER BY full_name, email LIMIT :it") @@ -55,7 +56,7 @@ public int getSeqAndSkip(int step) { public abstract void clean(); // https://habrahabr.ru/post/264281/ - @SqlBatch("INSERT INTO users (id, full_name, email, flag) VALUES (:id, :fullName, :email, CAST(:flag AS USER_FLAG))" + + @SqlBatch("INSERT INTO users (id, full_name, email, flag, city) VALUES (:id, :fullName, :email, CAST(:flag AS USER_FLAG), :city)" + "ON CONFLICT DO NOTHING") // "ON CONFLICT (email) DO UPDATE SET full_name=:fullName, flag=CAST(:flag AS USER_FLAG)") public abstract int[] insertBatch(@BindBean List users, @BatchChunkSize int chunkSize); diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/Project.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/Project.java index edf0130..123ef64 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/model/Project.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/Project.java @@ -2,6 +2,8 @@ import lombok.*; +import java.util.List; + /** * Created by val on 2017-04-05. */ @@ -13,6 +15,7 @@ public class Project extends BaseEntity{ private @NonNull String description; private @NonNull String name; + private List groups; public Project(Integer id, String description, String name) { this(description, name); diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java b/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java index 744a1e5..144c8db 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/model/User.java @@ -3,6 +3,8 @@ import com.bertoncelj.jdbi.entitymapper.Column; import lombok.*; +import java.util.List; + @Data @RequiredArgsConstructor @EqualsAndHashCode(callSuper = true) @@ -13,6 +15,7 @@ public class User extends BaseEntity { private @NonNull String email; private @NonNull UserFlag flag; private @NonNull String city; + private List groups; public User(Integer id, String fullName, String email, UserFlag flag, String city) { this(fullName, email, flag, city); diff --git a/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java b/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java index bc80bba..c0c933c 100644 --- a/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java +++ b/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java @@ -21,12 +21,12 @@ public class UserTestData { public static List FIST5_USERS; public static void init() { - ADMIN = new User("Admin", "admin@javaops.ru", UserFlag.superuser); - DELETED = new User("Deleted", "deleted@yandex.ru", UserFlag.deleted); - FULL_NAME = new User("Full Name", "gmail@gmail.com", UserFlag.active); - USER1 = new User("User1", "user1@gmail.com", UserFlag.active); - USER2 = new User("User2", "user2@yandex.ru", UserFlag.active); - USER3 = new User("User3", "user3@yandex.ru", UserFlag.active); + ADMIN = new User("Admin", "admin@javaops.ru", UserFlag.superuser, "msk"); + DELETED = new User("Deleted", "deleted@yandex.ru", UserFlag.deleted, "kiv"); + FULL_NAME = new User("Full Name", "gmail@gmail.com", UserFlag.active, "msk"); + USER1 = new User("User1", "user1@gmail.com", UserFlag.active, "kiv"); + USER2 = new User("User2", "user2@yandex.ru", UserFlag.active, "spb"); + USER3 = new User("User3", "user3@yandex.ru", UserFlag.active, "mnk"); FIST5_USERS = ImmutableList.of(ADMIN, DELETED, FULL_NAME, USER1, USER2); } diff --git a/persist/src/test/java/ru/javaops/masterjava/persist/dao/ProjectDaoTest.java b/persist/src/test/java/ru/javaops/masterjava/persist/dao/ProjectDaoTest.java new file mode 100644 index 0000000..189a79e --- /dev/null +++ b/persist/src/test/java/ru/javaops/masterjava/persist/dao/ProjectDaoTest.java @@ -0,0 +1,29 @@ +package ru.javaops.masterjava.persist.dao; + +import org.junit.Test; +import ru.javaops.masterjava.persist.model.Project; + +/** + * Created by val on 2017-04-06. + */ +public class ProjectDaoTest extends AbstractDaoTest{ + + public ProjectDaoTest() { + super(ProjectDao.class); + } + + @Test + public void insertGeneratedId() throws Exception { + Project project = new Project(100200, "Topjava", "topjava"); + int result = dao.insertGeneratedId(project); + System.out.println(result); + } + + @Test + public void insertWitId() throws Exception { + Project project = new Project(100201, "Topjava", "topjava"); + System.out.println(dao.insertWitId(project)); + + } + +} \ No newline at end of file diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java new file mode 100644 index 0000000..2d3725b --- /dev/null +++ b/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java @@ -0,0 +1,74 @@ +package ru.javaops.masterjava.export; + +import lombok.extern.slf4j.Slf4j; +import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.dao.CityDao; +import ru.javaops.masterjava.persist.model.City; +import ru.javaops.masterjava.xml.util.StaxStreamProcessor; + +import javax.xml.stream.events.XMLEvent; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * Created by val on 2017-04-06. + */ +@Slf4j +public class CityExport { + + private static final int NUMBER_THREADS = 4; + private final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_THREADS); + private final CityDao cityDao = DBIProvider.getDao(CityDao.class); + + public List process(final StaxStreamProcessor processor, int chunkSize) throws Exception { + + return new Callable>() { + + @Override + public List call() throws Exception { + + List futures = new ArrayList<>(); + int id = cityDao.getSeqAndSkip(chunkSize); + List chunk = new ArrayList<>(chunkSize); + + // Cities loop + String tag; + while ((tag = processor.doUntilAny(XMLEvent.START_ELEMENT, "City", "Users")) != null) { + + if (tag.equals("Users")) break; + + final String valueId = processor.getAttribute("id"); + final String value = processor.getReader().getElementText(); + + final City city = new City(id++, value, valueId); + chunk.add(city); + + if (chunk.size() == chunkSize) { + futures.addAll(submit(chunk)); + chunk = new ArrayList<>(chunkSize); + id = cityDao.getSeqAndSkip(chunkSize); + } + } + if (!chunk.isEmpty()) + futures.addAll(submit(chunk)); + return futures; + } + + private List submit(List cities) throws Exception { + Future> chunkFuture = executorService.submit(() -> { + List stringsResult = new ArrayList<>(); + cityDao.insertAndGetConflictValueId(cities) + .forEach(s -> stringsResult.add("city with id " + s + " already present")); + return stringsResult; + }); + log.info("Submit " + cities.size() + " cities"); + //return List with fail messages + return chunkFuture.get(); + } + }.call(); + } +} diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/PayloadExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/PayloadExport.java new file mode 100644 index 0000000..76cc183 --- /dev/null +++ b/web/export/src/main/java/ru/javaops/masterjava/export/PayloadExport.java @@ -0,0 +1,37 @@ +package ru.javaops.masterjava.export; + +import lombok.extern.slf4j.Slf4j; +import ru.javaops.masterjava.xml.util.StaxStreamProcessor; + +import javax.xml.stream.XMLStreamException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by val on 2017-04-06. + */ +@Slf4j +public class PayloadExport { + + private final ProjectGroupExport projectGroupExport = new ProjectGroupExport(); + private final CityExport cityExport = new CityExport(); + private final UserExport userExport = new UserExport(); + + public List process (final InputStream is, int chunkSize) throws XMLStreamException { + log.info("Start processing with chunkSize=" + chunkSize); + + final StaxStreamProcessor processor = new StaxStreamProcessor(is); + List result = new ArrayList<>(); + + try { + result.addAll(projectGroupExport.process(processor, chunkSize)); + result.addAll(cityExport.process(processor, chunkSize)); + userExport.process(processor, chunkSize).forEach(e -> result.add(e.toString())); + } catch (Exception e) { + e.printStackTrace(); + } + + return result; + } +} diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/ProjectGroupExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/ProjectGroupExport.java new file mode 100644 index 0000000..8fa78ea --- /dev/null +++ b/web/export/src/main/java/ru/javaops/masterjava/export/ProjectGroupExport.java @@ -0,0 +1,104 @@ +package ru.javaops.masterjava.export; + +import lombok.extern.slf4j.Slf4j; +import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.dao.GroupDao; +import ru.javaops.masterjava.persist.dao.ProjectDao; +import ru.javaops.masterjava.persist.model.Group; +import ru.javaops.masterjava.persist.model.GroupType; +import ru.javaops.masterjava.persist.model.Project; +import ru.javaops.masterjava.xml.util.StaxStreamProcessor; + +import javax.xml.stream.events.XMLEvent; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +/** + * Created by val on 2017-04-06. + */ +@Slf4j +public class ProjectGroupExport { + private static final int NUMBER_THREADS = 4; + private final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_THREADS); + private final GroupDao groupDao = DBIProvider.getDao(GroupDao.class); + private final ProjectDao projectDao = DBIProvider.getDao(ProjectDao.class); + + + public List process(final StaxStreamProcessor processor, int chunkSize) throws Exception { + + return new Callable>() { + + @Override + public List call() throws Exception { + + List futures = new ArrayList<>(); + int id = projectDao.getSeqAndSkip(chunkSize); + + // Projects loop + String tag = processor.doUntilAny(XMLEvent.START_ELEMENT, "Project", "Cities"); + while (tag != null) { + + if (tag.equals("Cities")) break; + + final String name = processor.getAttribute("name"); + processor.doUntil(XMLEvent.START_ELEMENT, "description"); + final String description = processor.getReader().getElementText(); + + final Project project = new Project(id++, description, name); + + // Groups loop + List groups = new ArrayList<>(); + while ((tag = processor.doUntilAny(XMLEvent.START_ELEMENT, "Project", "Group", "Cities")) != null) { + + if (tag.equals("Cities") || tag.equals("Project")) break; + + final String groupName = processor.getAttribute("name"); + final GroupType type = GroupType.valueOf(processor.getAttribute("type")); + + groups.add(new Group(id++, groupName, type, project.getId())); + + if (groups.size() == chunkSize - 1) { + id = projectDao.getSeqAndSkip(chunkSize); + } + } + project.setGroups(groups); + + futures.addAll(submit(project)); + } + return futures; + } + + private List submit(Project project) throws Exception { + Future> chunkFuture = executorService.submit(() -> { + List stringsResult = new ArrayList<>(); + + //add project to DB + //insertWitId on conflict return id for row already present in DB + int resultId = projectDao.insertWitId(project); + if (resultId != project.getId()) { + stringsResult.add("project with name " + project.getName() + " already present"); + //change projectId for project & groups to actual + project.setId(resultId); + project.getGroups().forEach(g -> g.setProjectId(resultId)); + } + + //add groups to DB by chunk + List conflictNames = new ArrayList<>(); + int groupSize = project.getGroups().size(); + for (int i = 0; i < groupSize / chunkSize + 1; i++) { + int start = i * chunkSize; + int end = (i + 1) * chunkSize > groupSize ? groupSize : (i + 1) * chunkSize; + List groupsChunk = project.getGroups().subList(start, end); + conflictNames.addAll(groupDao.insertAndGetConflictName(groupsChunk)); + } + conflictNames.forEach(s -> stringsResult.add(" - group with name " + s + " for project " + project.getName() + " already present")); + return stringsResult; + }); + log.info("Submit project " + project.getName() + " with " + project.getGroups().size() + " groups"); + //return List with fail messages + return chunkFuture.get(); + } + }.call(); + } +} diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java b/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java index 9a8b98d..f3adad8 100644 --- a/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java +++ b/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java @@ -24,6 +24,7 @@ public class UploadServlet extends HttpServlet { private static final int CHUNK_SIZE = 2000; private final UserExport userExport = new UserExport(); + private final PayloadExport payloadExport = new PayloadExport(); @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { @@ -42,12 +43,19 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S } else { Part filePart = req.getPart("fileToUpload"); try (InputStream is = filePart.getInputStream()) { - List failed = userExport.process(is, chunkSize); + /*List failed = userExport.process(is, chunkSize); log.info("Failed users: " + failed); final WebContext webContext = new WebContext(req, resp, req.getServletContext(), req.getLocale(), ImmutableMap.of("failed", failed)); engine.process("result", webContext, resp.getWriter()); + return;*/ + List failed = payloadExport.process(is, chunkSize); + log.info("Failed uploads: " + failed); + final WebContext webContext = + new WebContext(req, resp, req.getServletContext(), req.getLocale(), + ImmutableMap.of("failed", failed)); + engine.process("result", webContext, resp.getWriter()); return; } } diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java index 0a2188b..be185ee 100644 --- a/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java +++ b/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java @@ -11,7 +11,6 @@ import javax.xml.stream.XMLStreamException; import javax.xml.stream.events.XMLEvent; -import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -41,8 +40,7 @@ public String toString() { } } - public List process(final InputStream is, int chunkSize) throws XMLStreamException { - log.info("Start proseccing with chunkSize=" + chunkSize); + public List process(final StaxStreamProcessor processor, int chunkSize) throws XMLStreamException { return new Callable>() { class ChunkFuture { @@ -64,13 +62,13 @@ public List call() throws XMLStreamException { int id = userDao.getSeqAndSkip(chunkSize); List chunk = new ArrayList<>(chunkSize); - final StaxStreamProcessor processor = new StaxStreamProcessor(is); while (processor.doUntil(XMLEvent.START_ELEMENT, "User")) { final String email = processor.getAttribute("email"); + final String city = processor.getAttribute("city"); final UserFlag flag = UserFlag.valueOf(processor.getAttribute("flag")); final String fullName = processor.getReader().getElementText(); - final User user = new User(id++, fullName, email, flag); + final User user = new User(id++, fullName, email, flag, city); chunk.add(user); if (chunk.size() == chunkSize) { futures.add(submit(chunk)); From 0a626fe0ee028342a1f3e608658bccbffb1dc20d Mon Sep 17 00:00:00 2001 From: Tanechka Date: Fri, 7 Apr 2017 00:21:07 -0700 Subject: [PATCH 3/5] optional HW --- .../masterjava/persist/dao/AbstractDao.java | 14 +++ .../masterjava/persist/dao/CityDao.java | 32 +------ .../masterjava/persist/dao/GroupDao.java | 47 ++++------ .../masterjava/persist/dao/ProjectDao.java | 45 +-------- .../masterjava/persist/dao/UserDao.java | 23 ++--- .../masterjava/persist/dao/GroupDaoTest.java | 12 +++ .../persist/dao/ProjectDaoTest.java | 7 -- .../javaops/masterjava/export/CityExport.java | 8 +- .../masterjava/export/PayloadExport.java | 11 ++- .../masterjava/export/ProjectGroupExport.java | 7 +- .../masterjava/export/UploadServlet.java | 9 -- .../javaops/masterjava/export/UserExport.java | 93 +++++++------------ 12 files changed, 109 insertions(+), 199 deletions(-) create mode 100644 persist/src/test/java/ru/javaops/masterjava/persist/dao/GroupDaoTest.java diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/AbstractDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/AbstractDao.java index f7e97e4..87c5e7d 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/AbstractDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/AbstractDao.java @@ -1,5 +1,9 @@ package ru.javaops.masterjava.persist.dao; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.Transaction; +import ru.javaops.masterjava.persist.DBIProvider; + /** * gkislin * 27.10.2016 @@ -8,4 +12,14 @@ */ public interface AbstractDao { void clean(); + + @SqlQuery("SELECT nextval('global_seq')") + int getNextVal(); + + @Transaction + default int getSeqAndSkip(int step) { + int id = getNextVal(); + DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE global_seq RESTART WITH " + (id + step))); + return id; + } } diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/CityDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/CityDao.java index d77ec63..25b2bc8 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/CityDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/CityDao.java @@ -1,9 +1,10 @@ package ru.javaops.masterjava.persist.dao; import one.util.streamex.IntStreamEx; -import org.skife.jdbi.v2.sqlobject.*; +import org.skife.jdbi.v2.sqlobject.BindBean; +import org.skife.jdbi.v2.sqlobject.SqlBatch; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; -import ru.javaops.masterjava.persist.DBIProvider; import ru.javaops.masterjava.persist.model.City; import java.util.List; @@ -13,33 +14,6 @@ */ public abstract class CityDao implements AbstractDao { - public City insert(City city) { - if (city.isNew()) { - int id = insertGeneratedId(city); - city.setId(id); - } else { - insertWitId(city); - } - return city; - } - - @SqlQuery("SELECT nextval('global_seq')") - abstract int getNextVal(); - - @Transaction - public int getSeqAndSkip(int step) { - int id = getNextVal(); - DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE global_seq RESTART WITH " + (id + step))); - return id; - } - - @SqlUpdate("INSERT INTO cities (value, value_id) VALUES (:value, :valueId)") - @GetGeneratedKeys - abstract int insertGeneratedId(@BindBean City city); - - @SqlUpdate("INSERT INTO cities (id, value, value_id) VALUES (:id, :value, :valueId)") - abstract void insertWitId(@BindBean City city); - @SqlBatch("INSERT INTO cities (id, value, value_id) VALUES (:id, :value, :valueId)" + "ON CONFLICT DO NOTHING") public abstract int[] insertBatch(@BindBean List cities, @BatchChunkSize int chunkSize); diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java index 84d21f5..925e39c 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java @@ -2,10 +2,12 @@ import com.bertoncelj.jdbi.entitymapper.EntityMapperFactory; import one.util.streamex.IntStreamEx; -import org.skife.jdbi.v2.sqlobject.*; +import org.skife.jdbi.v2.sqlobject.Bind; +import org.skife.jdbi.v2.sqlobject.BindBean; +import org.skife.jdbi.v2.sqlobject.SqlBatch; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapperFactory; -import ru.javaops.masterjava.persist.DBIProvider; import ru.javaops.masterjava.persist.model.Group; import java.util.List; @@ -16,37 +18,18 @@ @RegisterMapperFactory(EntityMapperFactory.class) public abstract class GroupDao implements AbstractDao{ - public Group insert(Group group) { - if (group.isNew()) { - int id = insertGeneratedId(group); - group.setId(id); - } else { - insertWitId(group); - } - return group; - } - - @SqlQuery("SELECT nextval('global_seq')") - abstract int getNextVal(); - - @Transaction - public int getSeqAndSkip(int step) { - int id = getNextVal(); - DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE global_seq RESTART WITH " + (id + step))); - return id; - } - - @SqlUpdate("INSERT INTO groups (name, type, project_id) VALUES (:name, CAST(:type AS group_type), :projectId)") - @GetGeneratedKeys - abstract int insertGeneratedId(@BindBean Group group); - - @SqlUpdate("INSERT INTO groups (id, name, type, project_id) VALUES (:id, :name, CAST(:type AS group_type), :projectId)") - abstract void insertWitId(@BindBean Group group); + @SqlBatch("INSERT INTO groups (name) VALUES (:name)" + + "ON CONFLICT DO NOTHING") + public abstract int[] insertBatchByName(@BindBean List groups); @SqlBatch("INSERT INTO groups (id, name, type, project_id) VALUES (:id, :name, CAST(:type AS group_type), :projectId)" + "ON CONFLICT DO NOTHING") public abstract int[] insertBatch(@BindBean List groups, @BatchChunkSize int chunkSize); + @SqlBatch("INSERT INTO user_group (user_email, group_name) VALUES (:userEmail, :name)" + + "ON CONFLICT DO NOTHING") + public abstract int[] updateGroups(@Bind("userEmail") String email, @Bind("name") List groups); + public List insertAndGetConflictName(List groups) { int[] result = insertBatch(groups, groups.size()); return IntStreamEx.range(0, groups.size()) @@ -55,6 +38,14 @@ public List insertAndGetConflictName(List groups) { .toList(); } + public List insertByNameAndGetFailed(List groups) { + int[] result = insertBatchByName(groups); + return IntStreamEx.range(0, groups.size()) + .filter(i -> result[i] == 0) + .mapToObj(groups::get) + .toList(); + } + @SqlUpdate("TRUNCATE projects") @Override public abstract void clean(); diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/ProjectDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/ProjectDao.java index 86668f2..d3cea99 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/ProjectDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/ProjectDao.java @@ -1,61 +1,22 @@ package ru.javaops.masterjava.persist.dao; import com.bertoncelj.jdbi.entitymapper.EntityMapperFactory; -import one.util.streamex.IntStreamEx; -import org.skife.jdbi.v2.sqlobject.*; -import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; +import org.skife.jdbi.v2.sqlobject.BindBean; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapperFactory; -import ru.javaops.masterjava.persist.DBIProvider; import ru.javaops.masterjava.persist.model.Project; -import java.util.List; - /** * Created by val on 2017-04-06. */ @RegisterMapperFactory(EntityMapperFactory.class) public abstract class ProjectDao implements AbstractDao{ - public Project insert(Project project) { - if (project.isNew()) { - int id = insertGeneratedId(project); - project.setId(id); - } else { - insertWitId(project); - } - return project; - } - - @SqlQuery("SELECT nextval('global_seq')") - abstract int getNextVal(); - - @Transaction - public int getSeqAndSkip(int step) { - int id = getNextVal(); - DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE global_seq RESTART WITH " + (id + step))); - return id; - } - - @SqlUpdate("INSERT INTO projects (description, name) VALUES (:description, :name)") - @GetGeneratedKeys - abstract int insertGeneratedId(@BindBean Project project); - @SqlQuery("INSERT INTO projects (id, description, name) VALUES (:id, :description, :name)" + "ON CONFLICT ON CONSTRAINT name_unique DO UPDATE SET name=EXCLUDED.name RETURNING id") public abstract int insertWitId(@BindBean Project project); - @SqlBatch("INSERT INTO projects (id, description, name) VALUES (:id, :description, :name)" + - "ON CONFLICT DO NOTHING") - public abstract int[] insertBatch(@BindBean List projects, @BatchChunkSize int chunkSize); - - public List insertAndGetConflictName(List projects) { - int[] result = insertBatch(projects, projects.size()); - return IntStreamEx.range(0, projects.size()) - .filter(i -> result[i] == 0) - .mapToObj(index -> projects.get(index).getName()) - .toList(); - } - @SqlUpdate("TRUNCATE projects") @Override public abstract void clean(); diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java index fe65321..c9d549f 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java @@ -8,7 +8,7 @@ import ru.javaops.masterjava.persist.DBIProvider; import ru.javaops.masterjava.persist.model.User; -import java.util.List; +import java.util.*; /** * gkislin @@ -20,6 +20,8 @@ @RegisterMapperFactory(EntityMapperFactory.class) public abstract class UserDao implements AbstractDao { + private final GroupDao groupDao = DBIProvider.getDao(GroupDao.class); + public User insert(User user) { if (user.isNew()) { int id = insertGeneratedId(user); @@ -30,16 +32,6 @@ public User insert(User user) { return user; } - @SqlQuery("SELECT nextval('global_seq')") - abstract int getNextVal(); - - @Transaction - public int getSeqAndSkip(int step) { - int id = getNextVal(); - DBIProvider.getDBI().useHandle(h -> h.execute("ALTER SEQUENCE global_seq RESTART WITH " + (id + step))); - return id; - } - @SqlUpdate("INSERT INTO users (full_name, email, flag, city) VALUES (:fullName, :email, CAST(:flag AS USER_FLAG), :city) ") @GetGeneratedKeys abstract int insertGeneratedId(@BindBean User user); @@ -59,14 +51,17 @@ public int getSeqAndSkip(int step) { @SqlBatch("INSERT INTO users (id, full_name, email, flag, city) VALUES (:id, :fullName, :email, CAST(:flag AS USER_FLAG), :city)" + "ON CONFLICT DO NOTHING") // "ON CONFLICT (email) DO UPDATE SET full_name=:fullName, flag=CAST(:flag AS USER_FLAG)") - public abstract int[] insertBatch(@BindBean List users, @BatchChunkSize int chunkSize); + public abstract int[] insertBatch(@BindBean Collection users, @BatchChunkSize int chunkSize); - public List insertAndGetConflictEmails(List users) { + public List insertAndGetConflicts(List users) { int[] result = insertBatch(users, users.size()); return IntStreamEx.range(0, users.size()) .filter(i -> result[i] == 0) - .mapToObj(index -> users.get(index).getEmail()) + .mapToObj(users::get) .toList(); } + + + } diff --git a/persist/src/test/java/ru/javaops/masterjava/persist/dao/GroupDaoTest.java b/persist/src/test/java/ru/javaops/masterjava/persist/dao/GroupDaoTest.java new file mode 100644 index 0000000..547300f --- /dev/null +++ b/persist/src/test/java/ru/javaops/masterjava/persist/dao/GroupDaoTest.java @@ -0,0 +1,12 @@ +package ru.javaops.masterjava.persist.dao; + +/** + * Created by val on 2017-04-06. + */ +public class GroupDaoTest extends AbstractDaoTest { + + public GroupDaoTest() { + super(GroupDao.class); + } + +} \ No newline at end of file diff --git a/persist/src/test/java/ru/javaops/masterjava/persist/dao/ProjectDaoTest.java b/persist/src/test/java/ru/javaops/masterjava/persist/dao/ProjectDaoTest.java index 189a79e..2cc020a 100644 --- a/persist/src/test/java/ru/javaops/masterjava/persist/dao/ProjectDaoTest.java +++ b/persist/src/test/java/ru/javaops/masterjava/persist/dao/ProjectDaoTest.java @@ -12,13 +12,6 @@ public ProjectDaoTest() { super(ProjectDao.class); } - @Test - public void insertGeneratedId() throws Exception { - Project project = new Project(100200, "Topjava", "topjava"); - int result = dao.insertGeneratedId(project); - System.out.println(result); - } - @Test public void insertWitId() throws Exception { Project project = new Project(100201, "Topjava", "topjava"); diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java index 2d3725b..6a238f8 100644 --- a/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java +++ b/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java @@ -11,7 +11,6 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; /** @@ -19,12 +18,11 @@ */ @Slf4j public class CityExport { - - private static final int NUMBER_THREADS = 4; - private final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_THREADS); private final CityDao cityDao = DBIProvider.getDao(CityDao.class); - public List process(final StaxStreamProcessor processor, int chunkSize) throws Exception { + public List process(final ExecutorService executorService, + final StaxStreamProcessor processor, + int chunkSize) throws Exception { return new Callable>() { diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/PayloadExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/PayloadExport.java index 76cc183..03a86a0 100644 --- a/web/export/src/main/java/ru/javaops/masterjava/export/PayloadExport.java +++ b/web/export/src/main/java/ru/javaops/masterjava/export/PayloadExport.java @@ -7,6 +7,8 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Created by val on 2017-04-06. @@ -14,6 +16,9 @@ @Slf4j public class PayloadExport { + private static final int NUMBER_THREADS = 4; + private final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_THREADS); + private final ProjectGroupExport projectGroupExport = new ProjectGroupExport(); private final CityExport cityExport = new CityExport(); private final UserExport userExport = new UserExport(); @@ -25,9 +30,9 @@ public List process (final InputStream is, int chunkSize) throws XMLStre List result = new ArrayList<>(); try { - result.addAll(projectGroupExport.process(processor, chunkSize)); - result.addAll(cityExport.process(processor, chunkSize)); - userExport.process(processor, chunkSize).forEach(e -> result.add(e.toString())); + result.addAll(projectGroupExport.process(executorService, processor, chunkSize)); + result.addAll(cityExport.process(executorService, processor, chunkSize)); + result.addAll(userExport.process(executorService, processor, chunkSize)); } catch (Exception e) { e.printStackTrace(); } diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/ProjectGroupExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/ProjectGroupExport.java index 8fa78ea..ffb7073 100644 --- a/web/export/src/main/java/ru/javaops/masterjava/export/ProjectGroupExport.java +++ b/web/export/src/main/java/ru/javaops/masterjava/export/ProjectGroupExport.java @@ -19,13 +19,12 @@ */ @Slf4j public class ProjectGroupExport { - private static final int NUMBER_THREADS = 4; - private final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_THREADS); private final GroupDao groupDao = DBIProvider.getDao(GroupDao.class); private final ProjectDao projectDao = DBIProvider.getDao(ProjectDao.class); - - public List process(final StaxStreamProcessor processor, int chunkSize) throws Exception { + public List process(final ExecutorService executorService, + final StaxStreamProcessor processor, + int chunkSize) throws Exception { return new Callable>() { diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java b/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java index f3adad8..8a67c21 100644 --- a/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java +++ b/web/export/src/main/java/ru/javaops/masterjava/export/UploadServlet.java @@ -22,8 +22,6 @@ @Slf4j public class UploadServlet extends HttpServlet { private static final int CHUNK_SIZE = 2000; - - private final UserExport userExport = new UserExport(); private final PayloadExport payloadExport = new PayloadExport(); @Override @@ -43,13 +41,6 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S } else { Part filePart = req.getPart("fileToUpload"); try (InputStream is = filePart.getInputStream()) { - /*List failed = userExport.process(is, chunkSize); - log.info("Failed users: " + failed); - final WebContext webContext = - new WebContext(req, resp, req.getServletContext(), req.getLocale(), - ImmutableMap.of("failed", failed)); - engine.process("result", webContext, resp.getWriter()); - return;*/ List failed = payloadExport.process(is, chunkSize); log.info("Failed uploads: " + failed); final WebContext webContext = diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java index be185ee..0523036 100644 --- a/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java +++ b/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java @@ -1,21 +1,18 @@ package ru.javaops.masterjava.export; -import lombok.Value; import lombok.extern.slf4j.Slf4j; -import one.util.streamex.StreamEx; +import one.util.streamex.IntStreamEx; import ru.javaops.masterjava.persist.DBIProvider; +import ru.javaops.masterjava.persist.dao.GroupDao; import ru.javaops.masterjava.persist.dao.UserDao; import ru.javaops.masterjava.persist.model.User; import ru.javaops.masterjava.persist.model.UserFlag; import ru.javaops.masterjava.xml.util.StaxStreamProcessor; -import javax.xml.stream.XMLStreamException; import javax.xml.stream.events.XMLEvent; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; /** @@ -24,82 +21,62 @@ */ @Slf4j public class UserExport { - - private static final int NUMBER_THREADS = 4; - private final ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_THREADS); private final UserDao userDao = DBIProvider.getDao(UserDao.class); + private final GroupDao groupDao = DBIProvider.getDao(GroupDao.class); - @Value - public static class FailedEmail { - public String emailOrRange; - public String reason; - - @Override - public String toString() { - return emailOrRange + " : " + reason; - } - } - - public List process(final StaxStreamProcessor processor, int chunkSize) throws XMLStreamException { + public List process(final ExecutorService executorService, + final StaxStreamProcessor processor, + int chunkSize) throws Exception { - return new Callable>() { - class ChunkFuture { - String emailRange; - Future> future; - - public ChunkFuture(List chunk, Future> future) { - this.future = future; - this.emailRange = chunk.get(0).getEmail(); - if (chunk.size() > 1) { - this.emailRange += '-' + chunk.get(chunk.size() - 1).getEmail(); - } - } - } + return new Callable>() { @Override - public List call() throws XMLStreamException { - List futures = new ArrayList<>(); + public List call() throws Exception { + List futures = new ArrayList<>(); int id = userDao.getSeqAndSkip(chunkSize); - List chunk = new ArrayList<>(chunkSize); + Map> chunk = new HashMap<>(chunkSize); while (processor.doUntil(XMLEvent.START_ELEMENT, "User")) { final String email = processor.getAttribute("email"); final String city = processor.getAttribute("city"); + String groupRefs = processor.getAttribute("groupRefs"); + final String[] groups = groupRefs != null ? groupRefs.split("\\s+") : new String[0]; final UserFlag flag = UserFlag.valueOf(processor.getAttribute("flag")); final String fullName = processor.getReader().getElementText(); final User user = new User(id++, fullName, email, flag, city); - chunk.add(user); + chunk.put(user, Arrays.asList(groups)); if (chunk.size() == chunkSize) { - futures.add(submit(chunk)); - chunk = new ArrayList<>(chunkSize); + futures.addAll(submit(chunk)); + chunk = new HashMap<>(chunkSize); id = userDao.getSeqAndSkip(chunkSize); } } if (!chunk.isEmpty()) { - futures.add(submit(chunk)); + futures.addAll(submit(chunk)); } + return futures; + } - List failed = new ArrayList<>(); - futures.forEach(cf -> { - try { - failed.addAll(StreamEx.of(cf.future.get()).map(email -> new FailedEmail(email, "already present")).toList()); - log.info(cf.emailRange + " successfully executed"); - } catch (Exception e) { - log.error(cf.emailRange + " failed", e); - failed.add(new FailedEmail(cf.emailRange, e.toString())); - } + private List submit(Map> chunk) throws Exception { + List failed = new ArrayList<>(); + Future> chunkFuture = executorService.submit( + () -> userDao.insertAndGetConflicts(new ArrayList<>(chunk.keySet()))); + chunkFuture.get().forEach(u -> { + failed.add("User with email = " + u.getEmail() + " already present"); + chunk.remove(u); }); - return failed; - } - private ChunkFuture submit(List chunk) { - ChunkFuture chunkFuture = new ChunkFuture(chunk, - executorService.submit(() -> userDao.insertAndGetConflictEmails(chunk)) - ); - log.info("Submit " + chunkFuture.emailRange); - return chunkFuture; + //actually we don't use it because we have "ON DELETE CASCADE ON UPDATE CASCADE" in user_group table + chunk.forEach((User u, List l) -> { + int[] result = groupDao.updateGroups(u.getEmail(), l); + IntStreamEx.range(0, l.size()) + .filter(i -> result[i] == 0) + .forEach(s -> failed.add("failed to add group " + s + " for User " + u.getEmail())); + }); + log.info("Submit " + chunk.size() + " users"); + return failed; } }.call(); } From 372e4047fcdbf2db2c6af7c07439b1dc357cb03f Mon Sep 17 00:00:00 2001 From: Tanechka Date: Fri, 7 Apr 2017 12:58:36 -0700 Subject: [PATCH 4/5] optional clean DAO & refactor UserExport --- .../masterjava/persist/dao/GroupDao.java | 12 ------ .../masterjava/persist/dao/UserDao.java | 7 +--- .../javaops/masterjava/export/CityExport.java | 2 +- .../javaops/masterjava/export/UserExport.java | 41 +++++++++---------- 4 files changed, 23 insertions(+), 39 deletions(-) diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java index 925e39c..a2be108 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/GroupDao.java @@ -18,10 +18,6 @@ @RegisterMapperFactory(EntityMapperFactory.class) public abstract class GroupDao implements AbstractDao{ - @SqlBatch("INSERT INTO groups (name) VALUES (:name)" + - "ON CONFLICT DO NOTHING") - public abstract int[] insertBatchByName(@BindBean List groups); - @SqlBatch("INSERT INTO groups (id, name, type, project_id) VALUES (:id, :name, CAST(:type AS group_type), :projectId)" + "ON CONFLICT DO NOTHING") public abstract int[] insertBatch(@BindBean List groups, @BatchChunkSize int chunkSize); @@ -38,14 +34,6 @@ public List insertAndGetConflictName(List groups) { .toList(); } - public List insertByNameAndGetFailed(List groups) { - int[] result = insertBatchByName(groups); - return IntStreamEx.range(0, groups.size()) - .filter(i -> result[i] == 0) - .mapToObj(groups::get) - .toList(); - } - @SqlUpdate("TRUNCATE projects") @Override public abstract void clean(); diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java index c9d549f..e6945c4 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java @@ -5,10 +5,10 @@ import org.skife.jdbi.v2.sqlobject.*; import org.skife.jdbi.v2.sqlobject.customizers.BatchChunkSize; import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapperFactory; -import ru.javaops.masterjava.persist.DBIProvider; import ru.javaops.masterjava.persist.model.User; -import java.util.*; +import java.util.Collection; +import java.util.List; /** * gkislin @@ -16,12 +16,9 @@ *

*

*/ -//TODO update group_user table @RegisterMapperFactory(EntityMapperFactory.class) public abstract class UserDao implements AbstractDao { - private final GroupDao groupDao = DBIProvider.getDao(GroupDao.class); - public User insert(User user) { if (user.isNew()) { int id = insertGeneratedId(user); diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java index 6a238f8..9db386b 100644 --- a/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java +++ b/web/export/src/main/java/ru/javaops/masterjava/export/CityExport.java @@ -40,7 +40,7 @@ public List call() throws Exception { if (tag.equals("Users")) break; final String valueId = processor.getAttribute("id"); - final String value = processor.getReader().getElementText(); + final String value = processor.getText(); final City city = new City(id++, value, valueId); chunk.add(city); diff --git a/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java b/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java index 0523036..d1bafdf 100644 --- a/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java +++ b/web/export/src/main/java/ru/javaops/masterjava/export/UserExport.java @@ -1,7 +1,7 @@ package ru.javaops.masterjava.export; +import com.google.common.base.Splitter; import lombok.extern.slf4j.Slf4j; -import one.util.streamex.IntStreamEx; import ru.javaops.masterjava.persist.DBIProvider; import ru.javaops.masterjava.persist.dao.GroupDao; import ru.javaops.masterjava.persist.dao.UserDao; @@ -25,8 +25,8 @@ public class UserExport { private final GroupDao groupDao = DBIProvider.getDao(GroupDao.class); public List process(final ExecutorService executorService, - final StaxStreamProcessor processor, - int chunkSize) throws Exception { + final StaxStreamProcessor processor, + int chunkSize) throws Exception { return new Callable>() { @@ -41,11 +41,10 @@ public List call() throws Exception { final String email = processor.getAttribute("email"); final String city = processor.getAttribute("city"); String groupRefs = processor.getAttribute("groupRefs"); - final String[] groups = groupRefs != null ? groupRefs.split("\\s+") : new String[0]; final UserFlag flag = UserFlag.valueOf(processor.getAttribute("flag")); - final String fullName = processor.getReader().getElementText(); + final String fullName = processor.getText(); final User user = new User(id++, fullName, email, flag, city); - chunk.put(user, Arrays.asList(groups)); + chunk.put(user, Splitter.on(' ').splitToList(groupRefs)); if (chunk.size() == chunkSize) { futures.addAll(submit(chunk)); chunk = new HashMap<>(chunkSize); @@ -60,23 +59,23 @@ public List call() throws Exception { } private List submit(Map> chunk) throws Exception { - List failed = new ArrayList<>(); - Future> chunkFuture = executorService.submit( - () -> userDao.insertAndGetConflicts(new ArrayList<>(chunk.keySet()))); - chunkFuture.get().forEach(u -> { - failed.add("User with email = " + u.getEmail() + " already present"); - chunk.remove(u); - }); - //actually we don't use it because we have "ON DELETE CASCADE ON UPDATE CASCADE" in user_group table - chunk.forEach((User u, List l) -> { - int[] result = groupDao.updateGroups(u.getEmail(), l); - IntStreamEx.range(0, l.size()) - .filter(i -> result[i] == 0) - .forEach(s -> failed.add("failed to add group " + s + " for User " + u.getEmail())); - }); + Future> chunkFuture = executorService.submit( + () -> { + List failed = new ArrayList<>(); + List users = userDao.insertAndGetConflicts(new ArrayList<>(chunk.keySet())); + users.forEach(u -> { + failed.add("User with email = " + u.getEmail() + " already present"); + chunk.remove(u); + }); + // don't need failed messages for groups because "DELETE ON CASCADE" + chunk.forEach((User u, List groups) -> groupDao.updateGroups(u.getEmail(), groups)); + return failed; + }); + log.info("Submit " + chunk.size() + " users"); - return failed; + + return chunkFuture.get(); } }.call(); } From 23c5daa1b735c63c7d6fb987326118ec51185fe2 Mon Sep 17 00:00:00 2001 From: Tanechka Date: Fri, 7 Apr 2017 20:43:45 -0700 Subject: [PATCH 5/5] optional add insertBatchWithCity --- .../javaops/masterjava/persist/dao/UserDao.java | 9 ++++++++- .../masterjava/persist/UserTestData.java | 12 ++++++------ .../masterjava/persist/dao/UserDaoTest.java | 17 +++++++++++------ 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java index e6945c4..787ebeb 100644 --- a/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java +++ b/persist/src/main/java/ru/javaops/masterjava/persist/dao/UserDao.java @@ -40,7 +40,7 @@ public User insert(User user) { public abstract List getWithLimit(@Bind int limit); // http://stackoverflow.com/questions/13223820/postgresql-delete-all-content - @SqlUpdate("TRUNCATE users") + @SqlUpdate("TRUNCATE users CASCADE") @Override public abstract void clean(); @@ -51,6 +51,13 @@ public User insert(User user) { public abstract int[] insertBatch(@BindBean Collection users, @BatchChunkSize int chunkSize); + @SqlBatch("INSERT INTO users (id, full_name, email, flag, city) " + + "SELECT :id, :fullName, :email, CAST(:flag AS USER_FLAG), :city " + + "WHERE (SELECT value_id FROM cities WHERE value_id = :city) IS NOT NULL " + + "ON CONFLICT DO NOTHING") + public abstract int[] insertBatchWithCity(@BindBean Collection users, @BatchChunkSize int chunkSize); + + public List insertAndGetConflicts(List users) { int[] result = insertBatch(users, users.size()); return IntStreamEx.range(0, users.size()) diff --git a/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java b/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java index c0c933c..4a3f671 100644 --- a/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java +++ b/persist/src/test/java/ru/javaops/masterjava/persist/UserTestData.java @@ -21,12 +21,12 @@ public class UserTestData { public static List FIST5_USERS; public static void init() { - ADMIN = new User("Admin", "admin@javaops.ru", UserFlag.superuser, "msk"); - DELETED = new User("Deleted", "deleted@yandex.ru", UserFlag.deleted, "kiv"); - FULL_NAME = new User("Full Name", "gmail@gmail.com", UserFlag.active, "msk"); - USER1 = new User("User1", "user1@gmail.com", UserFlag.active, "kiv"); - USER2 = new User("User2", "user2@yandex.ru", UserFlag.active, "spb"); - USER3 = new User("User3", "user3@yandex.ru", UserFlag.active, "mnk"); + ADMIN = new User(100001,"Admin", "admin@javaops.ru", UserFlag.superuser, "msk"); + DELETED = new User(100002,"Deleted", "deleted@yandex.ru", UserFlag.deleted, "kiv"); + FULL_NAME = new User(100003,"Full Name", "gmail@gmail.com", UserFlag.active, "msk"); + USER1 = new User(100004, "User1", "user1@gmail.com", UserFlag.active, "kiv"); + USER2 = new User(100005, "User2", "user2@yandex.ru", UserFlag.active, "spb"); + USER3 = new User(100006, "User3", "user3@yandex.ru", UserFlag.active, "mnk"); FIST5_USERS = ImmutableList.of(ADMIN, DELETED, FULL_NAME, USER1, USER2); } diff --git a/persist/src/test/java/ru/javaops/masterjava/persist/dao/UserDaoTest.java b/persist/src/test/java/ru/javaops/masterjava/persist/dao/UserDaoTest.java index 1dd6ed4..aa9c5f0 100644 --- a/persist/src/test/java/ru/javaops/masterjava/persist/dao/UserDaoTest.java +++ b/persist/src/test/java/ru/javaops/masterjava/persist/dao/UserDaoTest.java @@ -1,13 +1,9 @@ package ru.javaops.masterjava.persist.dao; import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import ru.javaops.masterjava.persist.UserTestData; -import ru.javaops.masterjava.persist.model.User; - -import java.util.List; import static ru.javaops.masterjava.persist.UserTestData.FIST5_USERS; @@ -26,7 +22,7 @@ public static void init() throws Exception { UserTestData.init(); } - @Before + /*@Before public void setUp() throws Exception { UserTestData.setUp(); } @@ -42,7 +38,7 @@ public void insertBatch() throws Exception { dao.clean(); dao.insertBatch(FIST5_USERS, 3); Assert.assertEquals(5, dao.getWithLimit(100).size()); - } + }*/ @Test public void getSeqAndSkip() throws Exception { @@ -50,4 +46,13 @@ public void getSeqAndSkip() throws Exception { int seq2 = dao.getSeqAndSkip(1); Assert.assertEquals(5, seq2 - seq1); } + + @Test + public void insertBatchWithCity() throws Exception { + for (int x: dao.insertBatchWithCity(FIST5_USERS, 3)) { + System.out.println(x); + } + + Assert.assertEquals(3, dao.getWithLimit(100).size()); + } } \ No newline at end of file