diff --git a/.classpath b/.classpath index 072def0..dcd814b 100644 --- a/.classpath +++ b/.classpath @@ -12,14 +12,14 @@ - + - + - + diff --git a/pom.xml b/pom.xml index 540d1be..7d578b3 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ Upload big file by java. Using Apache HttpComponents. Can work on Android too. - 4.2.5 + 4.3.6 @@ -35,4 +35,4 @@ - \ No newline at end of file + diff --git a/src/main/java/cn/clxy/upload/ApacheHCUploader.java b/src/main/java/cn/clxy/upload/ApacheHCUploader.java index 8d058b7..a24dbbf 100644 --- a/src/main/java/cn/clxy/upload/ApacheHCUploader.java +++ b/src/main/java/cn/clxy/upload/ApacheHCUploader.java @@ -30,8 +30,6 @@ import org.apache.http.params.HttpConnectionParams; import org.apache.http.params.HttpParams; -import cn.clxy.upload.UploadFileService.Part; - /** * Use http client to upload. * @author clxy @@ -44,11 +42,11 @@ public class ApacheHCUploader implements Uploader { @Override public void upload(Part part) { - String fileName = part.getFileName(); + String partName = part.getName(); Map params = new HashMap(); - params.put(Config.KEY_FILE, new ByteArrayBody(part.getContent(), fileName)); + params.put(Config.keyFile, new ByteArrayBody(part.getContent(), partName)); post(params); - log.debug(fileName + " uploaded."); + log.debug(partName + " uploaded."); } @Override @@ -56,8 +54,8 @@ public void done(String fileName, long partCount) { Map params = new HashMap(); try { - params.put(Config.KEY_FILE_NAME, new StringBody(fileName)); - params.put(Config.PART_COUNT, new StringBody(String.valueOf(partCount))); + params.put(Config.keyFileName, new StringBody(fileName)); + params.put(Config.keyPartCount, new StringBody(String.valueOf(partCount))); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } @@ -68,7 +66,7 @@ public void done(String fileName, long partCount) { private void post(Map params) { - HttpPost post = new HttpPost(Config.URL); + HttpPost post = new HttpPost(Config.url); MultipartEntity entity = new MultipartEntity(); for (Entry e : params.entrySet()) { entity.addPart(e.getKey(), e.getValue()); @@ -100,11 +98,11 @@ private static HttpClient createClient() { schReg.register(new Scheme("https", 443, SSLSocketFactory.getSocketFactory())); PoolingClientConnectionManager ccm = new PoolingClientConnectionManager(schReg); - ccm.setMaxTotal(Config.MAX_UPLOAD); + ccm.setMaxTotal(Config.maxUpload); HttpParams params = new BasicHttpParams(); HttpConnectionParams.setConnectionTimeout(params, 10 * 1000); - HttpConnectionParams.setSoTimeout(params, Config.PART_UPLOAD_TIMEOUT); + HttpConnectionParams.setSoTimeout(params, Config.timeOut); return new DefaultHttpClient(ccm, params); } diff --git a/src/main/java/cn/clxy/upload/Config.java b/src/main/java/cn/clxy/upload/Config.java index 724efd0..ba6eb93 100644 --- a/src/main/java/cn/clxy/upload/Config.java +++ b/src/main/java/cn/clxy/upload/Config.java @@ -5,20 +5,54 @@ */ package cn.clxy.upload; +import java.io.InputStream; +import java.util.Properties; + public class Config { - public static final String URL = "http://192.168.1.99:8080/ssm/common/upload"; + public static String url = "http://192.168.1.99:8080/ssm/common/upload"; // Keys used by server. - public static final String KEY_FILE = "file"; - public static final String KEY_FILE_NAME = "fileName"; - public static final String PART_COUNT = "partCount"; + public static String keyFile = "file"; + public static String keyFileName = "fileName"; + public static String keyPartCount = "partCount"; // Upload threads and timeout per thread. Should be adjusted by network condition. - public static final int MAX_UPLOAD = 5; - public static final int PART_UPLOAD_TIMEOUT = 120 * 1000; + public static int maxUpload = 5; + public static int timeOut = 120 * 1000; // The size of part. - public static final int PART_SIZE = 100 * 1024; - public static final int MAX_READ = 5; + public static int partSize = 100 * 1024; + public static int maxRead = 5; + + static { + // load properties + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + try { + InputStream is = cl.getResourceAsStream("bigFileUpload.properties"); + if (is != null) { + Properties p = new Properties(); + p.load(is); + url = p.getProperty("url"); + keyFile = p.getProperty("keyFile"); + keyFileName = p.getProperty("keyFileName"); + keyPartCount = p.getProperty("keyPartCount"); + + String s = p.getProperty("maxUpload"); + maxUpload = Integer.parseInt(s); + s = p.getProperty("timeOut"); + timeOut = Integer.parseInt(s) * 1000; + s = p.getProperty("partSize"); + partSize = Integer.parseInt(s) * 1024; + s = p.getProperty("maxRead"); + maxRead = Integer.parseInt(s); + + } + } catch (Exception e) { + // do nothing. + } + } + + private Config() { + } } diff --git a/src/main/java/cn/clxy/upload/Listener.java b/src/main/java/cn/clxy/upload/Listener.java new file mode 100644 index 0000000..d57b7b6 --- /dev/null +++ b/src/main/java/cn/clxy/upload/Listener.java @@ -0,0 +1,60 @@ +package cn.clxy.upload; + +public interface Listener { + + void onStart(Object info); + + void onRead(Object info); + + void onUpload(Object info); + + void onPartDone(Object info); + + void onNotify(); + + void onFail(Object info); + + void onSuccess(); + + public static class Default implements Listener { + + @Override + public void onStart(Object info) { + onMessage("Start uploading " + info + " part(s)."); + } + + @Override + public void onRead(Object info) { + onMessage("Reading part " + info + "."); + } + + @Override + public void onUpload(Object info) { + onMessage("Uploading part " + info + "."); + } + + @Override + public void onPartDone(Object info) { + onMessage("Part " + info + " is done."); + } + + @Override + public void onNotify() { + onMessage("Notifying."); + } + + @Override + public void onFail(Object info) { + onMessage(info + " failed."); + } + + @Override + public void onSuccess() { + onMessage("Success."); + } + + protected void onMessage(String msg) { + System.out.println(msg); + } + } +} diff --git a/src/main/java/cn/clxy/upload/NotifyTask.java b/src/main/java/cn/clxy/upload/NotifyTask.java deleted file mode 100644 index 81ed7be..0000000 --- a/src/main/java/cn/clxy/upload/NotifyTask.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright (C) 2013 CLXY Studio. - * This content is released under the (Link Goes Here) MIT License. - * http://en.wikipedia.org/wiki/MIT_License - */ -package cn.clxy.upload; - -import java.io.File; -import java.util.concurrent.Callable; - -public class NotifyTask implements Callable { - - private File file; - private Uploader uploader; - - public NotifyTask(File file, Uploader uploader) { - this.file = file; - this.uploader = uploader; - } - - @Override - public String call() throws Exception { - - long length = file.length(); - long partCount = (length / Config.PART_SIZE) + (length % Config.PART_SIZE == 0 ? 0 : 1); - uploader.done(file.getName(), partCount); - return "notify"; - } -} diff --git a/src/main/java/cn/clxy/upload/Part.java b/src/main/java/cn/clxy/upload/Part.java new file mode 100644 index 0000000..7ae54bf --- /dev/null +++ b/src/main/java/cn/clxy/upload/Part.java @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2013 CLXY Studio. + * This content is released under the (Link Goes Here) MIT License. + * http://en.wikipedia.org/wiki/MIT_License + */ +package cn.clxy.upload; + +public class Part { + + private byte[] content; + private String name; + public static final Part NULL = new Part(); + + public Part() { + this(null, null); + } + + public Part(String name, byte[] content) { + this.content = content; + this.name = name; + } + + public byte[] getContent() { + return content; + } + + public String getName() { + return name; + } +} diff --git a/src/main/java/cn/clxy/upload/ReadTask.java b/src/main/java/cn/clxy/upload/ReadTask.java deleted file mode 100644 index d35bc1d..0000000 --- a/src/main/java/cn/clxy/upload/ReadTask.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Copyright (C) 2013 CLXY Studio. - * This content is released under the (Link Goes Here) MIT License. - * http://en.wikipedia.org/wiki/MIT_License - */ -package cn.clxy.upload; - -import java.io.File; -import java.io.FileInputStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; - -import cn.clxy.upload.UploadFileService.Part; - -/** - * Read big file into several parts. - * @author clxy - */ -public class ReadTask implements Callable { - - private File file; - private BlockingQueue parts; - - /** - * Limited part indexes. Only these parts will be read if specified. - */ - private List indexes; - - public ReadTask(File file, List indexes, BlockingQueue parts) { - this.file = file; - this.parts = parts; - this.indexes = indexes; - } - - @Override - public String call() throws Exception { - - FileInputStream fis = null; - String fileName = file.getName(); - int partSize = Config.PART_SIZE; - - try { - fis = new FileInputStream(file); - FileChannel fc = fis.getChannel(); - for (int i = 0;; i++) { - Status status = getReadStatus(i, indexes); - if (status == Status.stop) { - break; - } - - if (status == Status.skip) { - fc.position(fc.position() + partSize); - continue; - } - - if (status == Status.read) { - ByteBuffer bb = ByteBuffer.allocate(partSize); - int bytesRead = fc.read(bb); - if (bytesRead == -1) { - break; - } - byte[] bytes = bb.array(); - if (bytesRead != partSize) {// trim - bytes = Arrays.copyOf(bytes, bytesRead); - } - parts.put(new Part(createFileName(fileName, i), bytes)); - } - } - } finally { - if (fis != null) { - try { - fis.close(); - } catch (Exception e) { - } - } - parts.put(Part.NULL);// put end signal. - } - - return "read"; - } - - /** - * Create file name of part.
- * bigfile.avi = [bigfile.avi.0, bigfile.avi.1, bigfile.avi.2 ...] - * @param fileName - * @param i - * @return - */ - protected String createFileName(String fileName, int i) { - return fileName + "." + i;//start by 0. - // return fileName + (i == 0 ? "" : ("." + i)); - } - - private Status getReadStatus(int i, List indexes) { - - if (indexes == null || indexes.contains(i)) { - return Status.read; - } - - if (i > indexes.get(indexes.size() - 1)) { - return Status.stop; - } - - return Status.skip; - } - - private static enum Status { - stop, skip, read - } -} diff --git a/src/main/java/cn/clxy/upload/UploadFileService.java b/src/main/java/cn/clxy/upload/UploadFileService.java index ba1deca..7e82283 100644 --- a/src/main/java/cn/clxy/upload/UploadFileService.java +++ b/src/main/java/cn/clxy/upload/UploadFileService.java @@ -6,11 +6,15 @@ package cn.clxy.upload; import java.io.File; +import java.io.FileInputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; @@ -24,8 +28,12 @@ public class UploadFileService { private File file; + private BlockingQueue parts; + private List indexes; + + private Listener listener = new Listener.Default(); private Uploader uploader = new ApacheHCUploader(); - private ExecutorService executor = Executors.newFixedThreadPool(Config.MAX_UPLOAD); + private ExecutorService executor = Executors.newFixedThreadPool(Config.maxUpload); private static final Log log = LogFactory.getLog(UploadFileService.class); @@ -38,20 +46,20 @@ public UploadFileService(String fileName) { public void upload() { try { - doUpload(null); + doUpload(); } finally { stop(); } } - public void retry(Integer... indexes) { + public void retry(Integer... array) { // sort first. - List list = Arrays.asList(indexes); - Collections.sort(list); + indexes = Arrays.asList(array); + Collections.sort(indexes); try { - doUpload(list); + doUpload(); } finally { stop(); } @@ -63,24 +71,20 @@ public void stop() { } } - private void doUpload(final List indexes) { - - log.debug("Start! ===--------------------"); + private void doUpload() { - BlockingQueue parts = new ArrayBlockingQueue(Config.MAX_READ); + listener.onStart(indexes != null ? indexes.size() : getPartCount()); + parts = new ArrayBlockingQueue(Config.maxRead); CompletionService cs = new ExecutorCompletionService(executor); - log.debug("Reading started."); - cs.submit(new ReadTask(file, indexes, parts)); + cs.submit(readTask); - log.debug("Uploading started."); - - for (int i = 0; i < Config.MAX_UPLOAD; i++) { - cs.submit(new UploadTask("upload." + i, uploader, parts)); + for (int i = 0; i < Config.maxUpload; i++) { + cs.submit(new UploadTask("upload." + i)); } // Wait all done. total count = maxUpload + 1. - for (int i = 0; i <= Config.MAX_UPLOAD; i++) { + for (int i = 0; i <= Config.maxUpload; i++) { Future future = null; try { future = cs.take(); @@ -91,47 +95,147 @@ private void doUpload(final List indexes) { } // Notify sever all done. - Future result = executor.submit(new NotifyTask(file, uploader)); + Future result = executor.submit(notifyTask); checkFuture(result); - - log.debug("End! ===--------------------"); + listener.onSuccess(); } - private static String checkFuture(Future future) { + private String checkFuture(Future future) { + String result = null; try { - String result = future.get(); - log.debug(result + " is done."); + result = future.get(); return result; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } catch (ExecutionException e) { + listener.onFail(result); + log.error(e.getCause()); throw new RuntimeException(e.getCause()); } } - public static class Part { + protected int getPartCount() { + long length = file.length(); + long count = (length / Config.partSize) + (length % Config.partSize == 0 ? 0 : 1); + return (int) count; + } + + private Callable readTask = new Callable() { + + @Override + public String call() throws Exception { - private byte[] content; - private String fileName; - public static final Part NULL = new Part(); + FileInputStream fis = null; + String fileName = file.getName(); + int partSize = Config.partSize; - public Part() { - this(null, null); + try { + fis = new FileInputStream(file); + FileChannel fc = fis.getChannel(); + for (int i = 0;; i++) { + ReadStatus status = getReadStatus(i, indexes); + if (status == ReadStatus.stop) { + break; + } + + if (status == ReadStatus.skip) { + fc.position(fc.position() + partSize); + continue; + } + + if (status == ReadStatus.read) { + ByteBuffer bb = ByteBuffer.allocate(partSize); + int bytesRead = fc.read(bb); + if (bytesRead == -1) { + break; + } + byte[] bytes = bb.array(); + if (bytesRead != partSize) {// trim + bytes = Arrays.copyOf(bytes, bytesRead); + } + String partName = createFileName(fileName, i); + listener.onRead(partName); + parts.put(new Part(partName, bytes)); + } + } + } finally { + if (fis != null) { + try { + fis.close(); + } catch (Exception e) { + } + } + parts.put(Part.NULL);// put end signal. + } + + return "read"; } - public Part(String fileName, byte[] content) { - this.content = content; - this.fileName = fileName; + /** + * Create file name of part.
+ * bigfile.avi = [bigfile.avi.0, bigfile.avi.1, bigfile.avi.2 ...] + * @param fileName + * @param i + * @return + */ + protected String createFileName(String fileName, int i) { + return fileName + "." + i;// start by 0. + // return fileName + (i == 0 ? "" : ("." + i)); } - public byte[] getContent() { - return content; + private ReadStatus getReadStatus(int i, List indexes) { + + if (indexes == null || indexes.contains(i)) { + return ReadStatus.read; + } + + if (i > indexes.get(indexes.size() - 1)) { + return ReadStatus.stop; + } + + return ReadStatus.skip; } + }; + + private static enum ReadStatus { + stop, skip, read + } + + private class UploadTask implements Callable { - public String getFileName() { - return fileName; + private String name; + + public UploadTask(String name) { + this.name = name; + } + + @Override + public String call() throws Exception { + + while (true) { + + Part part = parts.take(); + if (part == Part.NULL) { + parts.add(Part.NULL);// notify others to stop. + break; + } + + String partName = part.getName(); + listener.onUpload(partName); + uploader.upload(part); + listener.onPartDone(partName); + } + return name; } } + + private Callable notifyTask = new Callable() { + @Override + public String call() throws Exception { + uploader.done(file.getName(), getPartCount()); + return "notify"; + } + }; } diff --git a/src/main/java/cn/clxy/upload/UploadTask.java b/src/main/java/cn/clxy/upload/UploadTask.java deleted file mode 100644 index 40a76ee..0000000 --- a/src/main/java/cn/clxy/upload/UploadTask.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (C) 2013 CLXY Studio. - * This content is released under the (Link Goes Here) MIT License. - * http://en.wikipedia.org/wiki/MIT_License - */ -package cn.clxy.upload; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; - -import cn.clxy.upload.UploadFileService.Part; - -public class UploadTask implements Callable { - - private String name; - private Uploader uploader; - private BlockingQueue parts; - - public UploadTask(String name, Uploader uploader, BlockingQueue parts) { - this.name = name; - this.uploader = uploader; - this.parts = parts; - } - - @Override - public String call() throws Exception { - while (true) { - Part part = parts.take(); - if (part == Part.NULL) { - parts.add(Part.NULL);// notify others to stop. - break; - } else { - uploader.upload(part); - } - } - return name; - } -} \ No newline at end of file diff --git a/src/main/java/cn/clxy/upload/Uploader.java b/src/main/java/cn/clxy/upload/Uploader.java index 65e253a..1a4ed7d 100644 --- a/src/main/java/cn/clxy/upload/Uploader.java +++ b/src/main/java/cn/clxy/upload/Uploader.java @@ -5,8 +5,6 @@ */ package cn.clxy.upload; -import cn.clxy.upload.UploadFileService.Part; - public interface Uploader { /** diff --git a/src/main/resources/bigFileUpload.properties b/src/main/resources/bigFileUpload.properties new file mode 100644 index 0000000..255b917 --- /dev/null +++ b/src/main/resources/bigFileUpload.properties @@ -0,0 +1,13 @@ +url=http://192.168.1.99:8080/ssm/common/upload + +keyFile=file +keyFileName=fileName +keyPartCount=partCount + +maxUpload=5 +#unit:second +timeOut=120 + +#unit:k +partSize=100 +maxRead=5