diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 73f69e0..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml -# Editor-based HTTP Client requests -/httpRequests/ diff --git a/.idea/CodeReadingNote.xml b/.idea/CodeReadingNote.xml deleted file mode 100644 index 2a0133f..0000000 --- a/.idea/CodeReadingNote.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - - \ No newline at end of file diff --git a/.idea/ObjectStorage.iml b/.idea/ObjectStorage.iml deleted file mode 100644 index c956989..0000000 --- a/.idea/ObjectStorage.iml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml deleted file mode 100644 index 28a804d..0000000 --- a/.idea/misc.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index 638b027..0000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 94a25f7..0000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml deleted file mode 100644 index c7a0bbe..0000000 --- a/.idea/workspace.xml +++ /dev/null @@ -1,317 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1607694590815 - - - 1607778089100 - - - 1607858454909 - - - 1607929674883 - - - - - - - - - - - true - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/README.md b/README.md index df88038..fad261c 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,2 @@ -# ObjectStorage 分布式对象存储项目开发 - -## 简介 -项目主要分为apiServer端和dataServer端,apiServer向外提供restful接口,负责接受客户的连接请求。 -dataServer负责存储数据。rabbitmq用于apiServer和dataServer端之间心跳、文件定位等。Elasticearch用于存储元数据。 - -## 主要特点 -### 数据校验和去重 -主要思想,计算文件的sha256哈希值,以哈希值代替文件名进行存储,元数据服务器存储的映射。客户上传数据时会将哈希作为http头部数据一起上传,服务器接受时会计算哈希,并对比。同时 -由于哈希的唯一性,重复数据只保留一份。 -### 数据冗余和即时修复 -用Reed Solomon纠删码进行冗余,每份数据分成若干份,每份存储在互不相同的结点上。读取的时候进行文件完整性检查和修复。 -修复 - -### 断点续传 -支持断点下载和上传。下载时http头部包含偏移量。上传为了支持哈希校验,在上传完成后才开始校验。 - -### gzip压缩 -节省空间,数据服务器存储存储压缩数据,下载时解压之后再发送出去。 - -## 改进 -加入Nginx进行负载均衡 +# ObjectStorage +分布式对象存储项目开发 diff --git a/apiServer/apiServer.go b/apiServer/apiServer.go deleted file mode 100644 index 9dc93f5..0000000 --- a/apiServer/apiServer.go +++ /dev/null @@ -1,26 +0,0 @@ -package main - -import ( - "ObjectStorage/apiServer/heartbeat" - "ObjectStorage/apiServer/locate" - "ObjectStorage/apiServer/object" - "ObjectStorage/apiServer/temp" - "ObjectStorage/apiServer/versions" - "log" - "net/http" - "os" -) - -func main() { - go heartbeat.ListenHeartBeat() - //相当于PUT上传数据 - http.HandleFunc("/objects/", object.Handler) - //相当于GET请求数据 - http.HandleFunc("/locate/", locate.Handler) - //Get版本所有信息,直接查询元数据服务器 - http.HandleFunc("/versions/", versions.Handler) - // - http.HandleFunc("/temp/", temp.Handler) - - log.Fatalln(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil)) -} diff --git a/apiServer/heartbeat/choose.go b/apiServer/heartbeat/choose.go deleted file mode 100644 index 9d50ec5..0000000 --- a/apiServer/heartbeat/choose.go +++ /dev/null @@ -1,30 +0,0 @@ -package heartbeat - -import "math/rand" - -func ChooseRandomDataServers(n int, exclude map[int]string) (ds []string) { - candidates := make([]string, 0) - //exclude the server which has the data - reverseExcludeServers := make(map[string]int) - for id, addr := range exclude { - reverseExcludeServers[addr] = id - } - //get current data servers that online - servers := GetDataServers() - for _, s := range servers { - //judge the server exclude or include - _, excluded := reverseExcludeServers[s] - if !excluded { - candidates = append(candidates, s) - } - } - length := len(candidates) - if length < n { - return - } - p := rand.Perm(length) - for i := 0; i < n; i++ { - ds = append(ds, candidates[p[i]]) - } - return -} diff --git a/apiServer/heartbeat/heartbeat.go b/apiServer/heartbeat/heartbeat.go deleted file mode 100644 index 6087e7b..0000000 --- a/apiServer/heartbeat/heartbeat.go +++ /dev/null @@ -1,51 +0,0 @@ -package heartbeat - -import ( - "ObjectStorage/src/lib/rabbitmq" - "os" - "strconv" - "sync" - "time" -) - -var dataServers = make(map[string]time.Time) -var mutex sync.Mutex - -func ListenHeartBeat() { - q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) - defer q.Close() - q.Bind("apiServers") - go removeExpiredDataServers() - c := q.Consume() - for msg := range c { - dataServer, err := strconv.Unquote(string(msg.Body)) - if err != nil { - panic(err) - } - mutex.Lock() - dataServers[dataServer] = time.Now() - mutex.Unlock() - } -} - -func removeExpiredDataServers() { - for { - time.Sleep(5 * time.Second) - mutex.Lock() - for s, t := range dataServers { - if t.Add(10 * time.Second).Before(time.Now()) { - delete(dataServers, s) - } - } - mutex.Unlock() - } -} -func GetDataServers() []string { - mutex.Lock() - defer mutex.Unlock() - servers := make([]string, 0) - for s, _ := range dataServers { - servers = append(servers, s) - } - return servers -} diff --git a/apiServer/locate/handler.go b/apiServer/locate/handler.go deleted file mode 100644 index 1310b5d..0000000 --- a/apiServer/locate/handler.go +++ /dev/null @@ -1,22 +0,0 @@ -package locate - -import ( - "encoding/json" - "net/http" - "strings" -) - -func Handler(w http.ResponseWriter, r *http.Request) { - m := r.Method - if m != http.MethodGet { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - info := Locate(strings.Split(r.URL.EscapedPath(), "/")[2]) - if len(info) == 0 { - w.WriteHeader(http.StatusNotFound) - return - } - b, _ := json.Marshal(info) - w.Write(b) -} diff --git a/apiServer/locate/locate.go b/apiServer/locate/locate.go deleted file mode 100644 index efeffd5..0000000 --- a/apiServer/locate/locate.go +++ /dev/null @@ -1,36 +0,0 @@ -package locate - -import ( - "ObjectStorage/src/lib/rabbitmq" - "ObjectStorage/src/lib/rs" - "ObjectStorage/src/lib/types" - "encoding/json" - "os" - "time" -) - -func Locate(name string) map[int]string { - q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) - q.Publish("dataServers", name) - c := q.Consume() - go func() { - time.Sleep(1 * time.Second) - q.Close() - }() - - locateInfo := make(map[int]string) - for i := 0; i < rs.ALL_SHARDS; i++ { - msg := <-c - if len(msg.Body) == 0 { - return locateInfo - } - var info types.LocateMessage - json.Unmarshal(msg.Body, &info) - locateInfo[info.Id] = info.Addr - } - return locateInfo -} - -func Exist(name string) bool { - return len(Locate(name)) >= rs.DATA_SHARDS -} diff --git a/apiServer/object/delete.go b/apiServer/object/delete.go deleted file mode 100644 index 7ed6fb7..0000000 --- a/apiServer/object/delete.go +++ /dev/null @@ -1,27 +0,0 @@ -package object - -import ( - "ObjectStorage/src/lib/es" - "log" - "net/http" - "strings" -) - -func del(w http.ResponseWriter, r *http.Request) { - name := strings.Split(r.URL.EscapedPath(), "/")[2] - //获取文件的最新元数据 - meta, err := es.SearchLatestVersion(name) - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusInternalServerError) - return - } - //获取元数据成功,添加新的元数据,标识此数据被删除 - err = es.PutMetadata(name, meta.Version+1, 0, "") - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusInternalServerError) - return - } - -} diff --git a/apiServer/object/get.go b/apiServer/object/get.go deleted file mode 100644 index 7582ff4..0000000 --- a/apiServer/object/get.go +++ /dev/null @@ -1,69 +0,0 @@ -package object - -import ( - "ObjectStorage/src/lib/es" - "ObjectStorage/src/lib/utils" - "compress/gzip" - "fmt" - "io" - "log" - "net/http" - "net/url" - "strconv" - "strings" -) - -func get(w http.ResponseWriter, r *http.Request) { - name := strings.Split(r.URL.EscapedPath(), "/")[2] - versionId := r.URL.Query()["version"] - version := 0 - var e error - if len(versionId) != 0 { - version, e = strconv.Atoi(versionId[0]) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusBadRequest) - return - } - } - meta, e := es.GetMetadata(name, version) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - return - } - if meta.Hash == "" { - w.WriteHeader(http.StatusNotFound) - return - } - hash := url.PathEscape(meta.Hash) - stream, e := GetStream(hash, meta.Size) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusNotFound) - return - } - offset := utils.GetOffsetFromHeader(r.Header) - if offset != 0 { - stream.Seek(offset, io.SeekCurrent) - w.Header().Set("content-range", fmt.Sprintf("bytes %d-%d/%d", offset, meta.Size-1, meta.Size)) - w.WriteHeader(http.StatusPartialContent) - } - acceptGzip := false - encoding := r.Header["Accept-Encoding"] - for i := range encoding { - if encoding[i] == "gzip" { - acceptGzip = true - break - } - } - if acceptGzip { - w.Header().Set("content-encoding", "gzip") - w2 := gzip.NewWriter(w) - io.Copy(w2, stream) - w2.Close() - } else { - io.Copy(w, stream) - } - stream.Close() -} diff --git a/apiServer/object/get_stream.go b/apiServer/object/get_stream.go deleted file mode 100644 index 4747d2b..0000000 --- a/apiServer/object/get_stream.go +++ /dev/null @@ -1,24 +0,0 @@ -package object - -import ( - "ObjectStorage/apiServer/heartbeat" - "ObjectStorage/apiServer/locate" - _ "ObjectStorage/src/lib/objectstream" - "ObjectStorage/src/lib/rs" - "fmt" - _ "io" -) - -func GetStream(hash string, size int64) (*rs.RSGetStream, error) { - locateInfo := locate.Locate(hash) - //data loss completely - if len(locateInfo) < rs.DATA_SHARDS { - return nil, fmt.Errorf("object %s locate fail,result is %v", hash, locateInfo) - } - //data loss partly,choose some other data servers - dataServers := make([]string, 0) - if len(locateInfo) != rs.ALL_SHARDS { - dataServers = heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS-len(locateInfo), locateInfo) - } - return rs.NewRSGetStream(locateInfo, dataServers, hash, size) -} diff --git a/apiServer/object/handler.go b/apiServer/object/handler.go deleted file mode 100644 index aeefcf3..0000000 --- a/apiServer/object/handler.go +++ /dev/null @@ -1,24 +0,0 @@ -package object - -import "net/http" - -func Handler(w http.ResponseWriter, r *http.Request) { - m := r.Method - if m == http.MethodPut { - put(w, r) - return - } - if m == http.MethodGet { - get(w, r) - return - } - if m == http.MethodDelete { - del(w, r) - return - } - if m == http.MethodPost { - post(w, r) - return - } - w.WriteHeader(http.StatusMethodNotAllowed) -} diff --git a/apiServer/object/post.go b/apiServer/object/post.go deleted file mode 100644 index cb8cffe..0000000 --- a/apiServer/object/post.go +++ /dev/null @@ -1,54 +0,0 @@ -package object - -import ( - "ObjectStorage/apiServer/heartbeat" - "ObjectStorage/apiServer/locate" - "ObjectStorage/src/lib/es" - "ObjectStorage/src/lib/rs" - "ObjectStorage/src/lib/utils" - "log" - "net/http" - "net/url" - "strconv" - "strings" -) - -func post(w http.ResponseWriter, r *http.Request) { - name := strings.Split(r.URL.EscapedPath(), "/")[2] - size, e := strconv.ParseInt(r.Header.Get("size"), 0, 64) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusForbidden) - return - } - hash := utils.GetHashFromHeader(r.Header) - if hash == "" { - log.Println("missing object hash in digest header") - w.WriteHeader(http.StatusBadRequest) - return - } - if locate.Exist(url.PathEscape(hash)) { - e = es.AddVersion(name, hash, size) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) - } - return - } - ds := heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS, nil) - if len(ds) != rs.ALL_SHARDS { - log.Println("cannot find enough dataServer") - w.WriteHeader(http.StatusServiceUnavailable) - return - } - stream, e := rs.NewRSResumablePutStream(ds, name, url.PathEscape(hash), size) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - return - } - w.Header().Set("location", "/temp/"+url.PathEscape(stream.ToToken())) - w.WriteHeader(http.StatusCreated) -} diff --git a/apiServer/object/put.go b/apiServer/object/put.go deleted file mode 100644 index 3e103ce..0000000 --- a/apiServer/object/put.go +++ /dev/null @@ -1,37 +0,0 @@ -package object - -import ( - "ObjectStorage/src/lib/es" - "ObjectStorage/src/lib/utils" - "log" - "net/http" - "strings" -) - -func put(w http.ResponseWriter, r *http.Request) { - hash := utils.GetHashFromHeader(r.Header) - if hash == "" { - log.Println("missing object hash in digest header") - w.WriteHeader(http.StatusBadRequest) - return - } - - size := utils.GetSizeFromHeader(r.Header) - c, e := storeObject(r.Body, hash, size) - if e != nil { - log.Println(e) - w.WriteHeader(c) - return - } - if c != http.StatusOK { - w.WriteHeader(c) - return - } - - name := strings.Split(r.URL.EscapedPath(), "/")[2] - e = es.AddVersion(name, hash, size) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - } -} diff --git a/apiServer/object/put_stream.go b/apiServer/object/put_stream.go deleted file mode 100644 index 639f1e8..0000000 --- a/apiServer/object/put_stream.go +++ /dev/null @@ -1,16 +0,0 @@ -package object - -import ( - "ObjectStorage/apiServer/heartbeat" - "ObjectStorage/src/lib/rs" - "fmt" -) - -func putStream(hash string, size int64) (*rs.RSPutStream, error) { - servers := heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS, nil) - if len(servers) != rs.ALL_SHARDS { - return nil, fmt.Errorf("cannot find enough dataServer") - } - - return rs.NewRSPutStream(servers, hash, size) -} diff --git a/apiServer/object/store.go b/apiServer/object/store.go deleted file mode 100644 index 0a460f2..0000000 --- a/apiServer/object/store.go +++ /dev/null @@ -1,31 +0,0 @@ -package object - -import ( - "ObjectStorage/apiServer/locate" - "ObjectStorage/src/lib/utils" - "fmt" - "io" - "net/http" - "net/url" -) - -func storeObject(r io.Reader, hash string, size int64) (int, error) { - //locate time consume 1s,because our rabbitmq timeout is 1s. - if locate.Exist(url.PathEscape(hash)) { - return http.StatusOK, nil - } - - stream, e := putStream(url.PathEscape(hash), size) - if e != nil { - return http.StatusInternalServerError, e - } - - reader := io.TeeReader(r, stream) - d := utils.CalculateHash(reader) - if d != hash { - stream.Commit(false) - return http.StatusBadRequest, fmt.Errorf("object hash mismatch, calculated=%s, requested=%s", d, hash) - } - stream.Commit(true) - return http.StatusOK, nil -} diff --git a/apiServer/temp/handler.go b/apiServer/temp/handler.go deleted file mode 100644 index e506263..0000000 --- a/apiServer/temp/handler.go +++ /dev/null @@ -1,16 +0,0 @@ -package temp - -import "net/http" - -func Handler(w http.ResponseWriter, r *http.Request) { - method := r.Method - if method == http.MethodPut { - put(w, r) - return - } - if method == http.MethodHead { - head(w, r) - return - } - w.WriteHeader(http.StatusMethodNotAllowed) -} diff --git a/apiServer/temp/head.go b/apiServer/temp/head.go deleted file mode 100644 index 0d67297..0000000 --- a/apiServer/temp/head.go +++ /dev/null @@ -1,25 +0,0 @@ -package temp - -import ( - "ObjectStorage/src/lib/rs" - "fmt" - "log" - "net/http" - "strings" -) - -func head(w http.ResponseWriter, r *http.Request) { - token := strings.Split(r.URL.EscapedPath(), "/")[2] - stream, e := rs.NewRSResumablePutStreamFromToken(token) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusForbidden) - return - } - current := stream.CurrentSize() - if current == -1 { - w.WriteHeader(http.StatusNotFound) - return - } - w.Header().Set("content-length", fmt.Sprintf("%d", current)) -} diff --git a/apiServer/temp/put.go b/apiServer/temp/put.go deleted file mode 100644 index 86e3a44..0000000 --- a/apiServer/temp/put.go +++ /dev/null @@ -1,75 +0,0 @@ -package temp - -import ( - "ObjectStorage/apiServer/locate" - "ObjectStorage/src/lib/es" - "ObjectStorage/src/lib/rs" - "ObjectStorage/src/lib/utils" - "io" - "log" - "net/http" - "net/url" - "strings" -) - -func put(w http.ResponseWriter, r *http.Request) { - token := strings.Split(r.URL.EscapedPath(), "/")[2] - stream, e := rs.NewRSResumablePutStreamFromToken(token) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusForbidden) - return - } - current := stream.CurrentSize() - if current == -1 { - w.WriteHeader(http.StatusNotFound) - return - } - offset := utils.GetOffsetFromHeader(r.Header) - if current != offset { - w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) - return - } - bytes := make([]byte, rs.BLOCK_SIZE) - for { - n, e := io.ReadFull(r.Body, bytes) - if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - return - } - current += int64(n) - if current > stream.Size { - stream.Commit(false) - log.Println("resumable put exceed size") - w.WriteHeader(http.StatusForbidden) - return - } - if n != rs.BLOCK_SIZE && current != stream.Size { - return - } - stream.Write(bytes[:n]) - if current == stream.Size { - stream.Flush() - getStream, e := rs.NewRSResumableGetStream(stream.Servers, stream.Uuids, stream.Size) - hash := url.PathEscape(utils.CalculateHash(getStream)) - if hash != stream.Hash { - stream.Commit(false) - log.Println("resumable put done but hash mismatch") - w.WriteHeader(http.StatusForbidden) - return - } - if locate.Exist(url.PathEscape(hash)) { - stream.Commit(false) - } else { - stream.Commit(true) - } - e = es.AddVersion(stream.Name, stream.Hash, stream.Size) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - } - return - } - } -} diff --git a/apiServer/versions/handler.go b/apiServer/versions/handler.go deleted file mode 100644 index 14ce733..0000000 --- a/apiServer/versions/handler.go +++ /dev/null @@ -1,38 +0,0 @@ -package versions - -import ( - "ObjectStorage/src/lib/es" - "encoding/json" - "log" - "net/http" - "strings" -) - -func Handler(w http.ResponseWriter, r *http.Request) { - m := r.Method - if m != http.MethodGet { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - from := 0 - size := 1000 - name := strings.Split(r.URL.EscapedPath(), "/")[2] - for { - metas, err := es.SearchAllVersions(name, from, size) - if err != nil { - log.Println(err) - w.WriteHeader(http.StatusInternalServerError) - return - } - for i := range metas { - b, _ := json.Marshal(metas[i]) - w.Write(b) - w.Write([]byte("\n")) - - } - if len(metas) != size { - return - } - from += size - } -} diff --git a/createfile.sh b/createfile.sh deleted file mode 100644 index bd0127a..0000000 --- a/createfile.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -for i in `seq 1 6` -do - mkdir -p /tmp/$i/objects - mkdir -p /tmp/$i/temp -done diff --git a/dataServer/ObjectsRoot/objects/test b/dataServer/ObjectsRoot/objects/test deleted file mode 100644 index c577e48..0000000 --- a/dataServer/ObjectsRoot/objects/test +++ /dev/null @@ -1 +0,0 @@ -this is a test object \ No newline at end of file diff --git a/dataServer/dataServer.go b/dataServer/dataServer.go deleted file mode 100644 index f6b720f..0000000 --- a/dataServer/dataServer.go +++ /dev/null @@ -1,26 +0,0 @@ -package main - -import ( - "ObjectStorage/dataServer/heartbeat" - "ObjectStorage/dataServer/locate" - "ObjectStorage/dataServer/object" - "ObjectStorage/dataServer/temp" - "log" - "net/http" - "os" -) - -func main() { - locate.CollectObjects() - //发送心跳 - go heartbeat.StartHeartBeat() - //接收定位消息 - go locate.StartLocate() - //若定位成功,则开始真正的接收和发送文件 - http.HandleFunc("/objects/", object.Handler) - http.HandleFunc("/temp/", temp.Handler) - err := http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil) - if err != nil { - log.Fatal("listen error", err) - } -} diff --git a/dataServer/heartbeat/heartbeat.go b/dataServer/heartbeat/heartbeat.go deleted file mode 100644 index e01a7bb..0000000 --- a/dataServer/heartbeat/heartbeat.go +++ /dev/null @@ -1,16 +0,0 @@ -package heartbeat - -import ( - "ObjectStorage/src/lib/rabbitmq" - "os" - "time" -) - -func StartHeartBeat() { - q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) - defer q.Close() - for { - q.Publish("apiServers", os.Getenv("LISTEN_ADDRESS")) - time.Sleep(5 * time.Second) - } -} diff --git a/dataServer/locate/locate.go b/dataServer/locate/locate.go deleted file mode 100644 index 93b3dbb..0000000 --- a/dataServer/locate/locate.go +++ /dev/null @@ -1,68 +0,0 @@ -package locate - -import ( - "ObjectStorage/src/lib/rabbitmq" - "ObjectStorage/src/lib/types" - "os" - "path/filepath" - "strconv" - "strings" - "sync" -) - -var objects = make(map[string]int) -var mutex sync.Mutex - -func Locate(hash string) int { - mutex.Lock() - id, ok := objects[hash] - mutex.Unlock() - if !ok { - return -1 - } - return id -} -func Add(hash string, id int) { - mutex.Lock() - objects[hash] = id - mutex.Unlock() -} - -func Del(hash string) { - mutex.Lock() - delete(objects, hash) - mutex.Unlock() -} - -func StartLocate() { - q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) - defer q.Close() - q.Bind("dataServers") - c := q.Consume() - for msg := range c { - hash, err := strconv.Unquote(string(msg.Body)) - if err != nil { - panic(err) - } - id := Locate(hash) - if id != -1 { - q.Send(msg.ReplyTo, types.LocateMessage{Addr: os.Getenv("LISTEN_ADDRESS"), Id: id}) - } - } -} - -func CollectObjects() { - files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*") - for i := range files { - file := strings.Split(filepath.Base(files[i]), ".") - if len(file) != 3 { - panic(files[i]) - } - hash := file[0] - id, err := strconv.Atoi(file[1]) - if err != nil { - panic(err) - } - objects[hash] = id - } -} diff --git a/dataServer/object/object.go b/dataServer/object/object.go deleted file mode 100644 index 06687e3..0000000 --- a/dataServer/object/object.go +++ /dev/null @@ -1,84 +0,0 @@ -package object - -import ( - "ObjectStorage/dataServer/locate" - "compress/gzip" - "crypto/sha256" - "encoding/base64" - "io" - "log" - "net/http" - "net/url" - "os" - "path/filepath" - "strings" -) - -func Handler(w http.ResponseWriter, r *http.Request) { - m := r.Method - if m == http.MethodGet { - get(w, r) - return - } - if m == http.MethodDelete { - del(w, r) - return - } - w.WriteHeader(http.StatusMethodNotAllowed) -} - -func get(w http.ResponseWriter, r *http.Request) { - file := getFile(strings.Split(r.URL.EscapedPath(), "/")[2]) - if file == "" { - w.WriteHeader(http.StatusNotFound) - return - } - sendFile(w, file) -} - -func getFile(name string) string { - fileName := os.Getenv("STORAGE_ROOT") + "/objects/" + name + ".*" - files, _ := filepath.Glob(fileName) - if len(files) != 1 { - locate.Del(strings.Split(name, ".")[0]) - return "" - } - file := files[0] - h := sha256.New() - sendFile(h, file) - shardHash := url.PathEscape(base64.StdEncoding.EncodeToString(h.Sum(nil))) - hash := strings.Split(file, ".")[2] - if shardHash != hash { - log.Println("object hash mismatched, remove", file) - locate.Del(hash) - os.Remove(file) - return "" - } - return file -} - -func sendFile(w io.Writer, file string) { - f, e := os.Open(file) - if e != nil { - log.Println(e) - return - } - defer f.Close() - gzipStream, e := gzip.NewReader(f) - if e != nil { - log.Println(e) - return - } - io.Copy(w, gzipStream) - gzipStream.Close() -} - -func del(w http.ResponseWriter, r *http.Request) { - hash := strings.Split(r.URL.EscapedPath(), "/")[2] - files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/" + hash + ".*") - if len(files) != 1 { - return - } - locate.Del(hash) - os.Rename(files[0], os.Getenv("STORAGE_ROOT")+"/garbage/"+filepath.Base(files[0])) -} diff --git a/dataServer/temp/commit.go b/dataServer/temp/commit.go deleted file mode 100644 index e4e6c3e..0000000 --- a/dataServer/temp/commit.go +++ /dev/null @@ -1,36 +0,0 @@ -package temp - -import ( - "ObjectStorage/dataServer/locate" - "ObjectStorage/src/lib/utils" - "compress/gzip" - "io" - "net/url" - "os" - "strconv" - "strings" -) - -func (t *tempInfo) hash() string { - return strings.Split(t.Name, ".")[0] -} - -func (t *tempInfo) id() int { - s := strings.Split(t.Name, ".") - id, _ := strconv.Atoi(s[1]) - return id -} - -func commitTempObject(datFile string, tempinfo *tempInfo) { - f, _ := os.Open(datFile) - shardHash := url.PathEscape(utils.CalculateHash(f)) - defer f.Close() - f.Seek(0, io.SeekStart) - w, _ := os.Create(os.Getenv("STORAGE_ROOT") + "/objects/" + tempinfo.Name + "." + shardHash) - w2 := gzip.NewWriter(w) - io.Copy(w2, f) - w2.Close() - locate.Add(tempinfo.hash(), tempinfo.id()) - os.Remove(datFile) - os.Remove(strings.Split(datFile, ".")[0]) -} diff --git a/dataServer/temp/delete.go b/dataServer/temp/delete.go deleted file mode 100644 index 59150b9..0000000 --- a/dataServer/temp/delete.go +++ /dev/null @@ -1,15 +0,0 @@ -package temp - -import ( - "net/http" - "os" - "strings" -) - -func del(w http.ResponseWriter, r *http.Request) { - uuid := strings.Split(r.URL.EscapedPath(), "/")[2] - infoFile := os.Getenv("STORAGE_ROOT") + "/temp/" + uuid - datFile := infoFile + ".dat" - os.Remove(infoFile) - os.Remove(datFile) -} diff --git a/dataServer/temp/get.go b/dataServer/temp/get.go deleted file mode 100644 index 8e9b1af..0000000 --- a/dataServer/temp/get.go +++ /dev/null @@ -1,21 +0,0 @@ -package temp - -import ( - "io" - "log" - "net/http" - "os" - "strings" -) - -func get(w http.ResponseWriter, r *http.Request) { - uuid := strings.Split(r.URL.EscapedPath(), "/")[2] - f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/temp/" + uuid + ".dat") - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusNotFound) - return - } - defer f.Close() - io.Copy(w, f) -} diff --git a/dataServer/temp/handler.go b/dataServer/temp/handler.go deleted file mode 100644 index 29c7a0b..0000000 --- a/dataServer/temp/handler.go +++ /dev/null @@ -1,32 +0,0 @@ -package temp - -import "net/http" - -func Handler(w http.ResponseWriter, r *http.Request) { - m := r.Method - if m == http.MethodPut { - put(w, r) - return - } - if m == http.MethodPatch { - patch(w, r) - return - } - if m == http.MethodPost { - post(w, r) - return - } - if m == http.MethodDelete { - del(w, r) - return - } - if m == http.MethodHead { - head(w, r) - return - } - if m == http.MethodGet { - get(w, r) - return - } - w.WriteHeader(http.StatusMethodNotAllowed) -} diff --git a/dataServer/temp/head.go b/dataServer/temp/head.go deleted file mode 100644 index 755f8c4..0000000 --- a/dataServer/temp/head.go +++ /dev/null @@ -1,27 +0,0 @@ -package temp - -import ( - "fmt" - "log" - "net/http" - "os" - "strings" -) - -func head(w http.ResponseWriter, r *http.Request) { - uuid := strings.Split(r.URL.EscapedPath(), "/")[2] - f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/temp/" + uuid + ".dat") - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusNotFound) - return - } - defer f.Close() - info, e := f.Stat() - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - return - } - w.Header().Set("content-length", fmt.Sprintf("%d", info.Size())) -} diff --git a/dataServer/temp/patch.go b/dataServer/temp/patch.go deleted file mode 100644 index 8a577b5..0000000 --- a/dataServer/temp/patch.go +++ /dev/null @@ -1,61 +0,0 @@ -package temp - -import ( - "encoding/json" - "io" - "io/ioutil" - "log" - "net/http" - "os" - "strings" -) - -func patch(w http.ResponseWriter, r *http.Request) { - uuid := strings.Split(r.URL.EscapedPath(), "/")[2] - tempinfo, e := readFromFile(uuid) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusNotFound) - return - } - infoFile := os.Getenv("STORAGE_ROOT") + "/temp/" + uuid - datFile := infoFile + ".dat" - f, e := os.OpenFile(datFile, os.O_WRONLY|os.O_APPEND, 0) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - return - } - defer f.Close() - _, e = io.Copy(f, r.Body) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - return - } - info, e := f.Stat() - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - return - } - actual := info.Size() - if actual > tempinfo.Size { - os.Remove(datFile) - os.Remove(infoFile) - log.Println("actual size", actual, "exceeds", tempinfo.Size) - w.WriteHeader(http.StatusInternalServerError) - } -} - -func readFromFile(uuid string) (*tempInfo, error) { - f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/temp/" + uuid) - if e != nil { - return nil, e - } - defer f.Close() - b, _ := ioutil.ReadAll(f) - var info tempInfo - json.Unmarshal(b, &info) - return &info, nil -} diff --git a/dataServer/temp/post.go b/dataServer/temp/post.go deleted file mode 100644 index 7173c9d..0000000 --- a/dataServer/temp/post.go +++ /dev/null @@ -1,49 +0,0 @@ -package temp - -import ( - "encoding/json" - "log" - "net/http" - "os" - "os/exec" - "strconv" - "strings" -) - -type tempInfo struct { - Uuid string - Name string - Size int64 -} - -func post(w http.ResponseWriter, r *http.Request) { - output, _ := exec.Command("uuidgen").Output() - uuid := strings.TrimSuffix(string(output), "\n") - name := strings.Split(r.URL.EscapedPath(), "/")[2] - size, e := strconv.ParseInt(r.Header.Get("size"), 0, 64) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - return - } - t := tempInfo{Uuid: uuid, Name: name, Size: size} - e = t.writeToFile() - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - return - } - os.Create(os.Getenv("STORAGE_ROOT") + "/temp/" + t.Uuid + ".dat") - w.Write([]byte(uuid)) -} - -func (t *tempInfo) writeToFile() error { - f, e := os.Create(os.Getenv("STORAGE_ROOT") + "/temp/" + t.Uuid) - if e != nil { - return e - } - defer f.Close() - b, _ := json.Marshal(t) - f.Write(b) - return nil -} diff --git a/dataServer/temp/put.go b/dataServer/temp/put.go deleted file mode 100644 index a2ef0d3..0000000 --- a/dataServer/temp/put.go +++ /dev/null @@ -1,42 +0,0 @@ -package temp - -import ( - "log" - "net/http" - "os" - "strings" -) - -func put(w http.ResponseWriter, r *http.Request) { - uuid := strings.Split(r.URL.EscapedPath(), "/")[2] - tempinfo, e := readFromFile(uuid) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusNotFound) - return - } - infoFile := os.Getenv("STORAGE_ROOT") + "/temp/" + uuid - datFile := infoFile + ".dat" - f, e := os.Open(datFile) - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - return - } - defer f.Close() - info, e := f.Stat() - if e != nil { - log.Println(e) - w.WriteHeader(http.StatusInternalServerError) - return - } - actual := info.Size() - os.Remove(infoFile) - if actual != tempinfo.Size { - os.Remove(datFile) - log.Println("actual size mismatch, expect", tempinfo.Size, "actual", actual) - w.WriteHeader(http.StatusInternalServerError) - return - } - commitTempObject(datFile, tempinfo) -} diff --git a/deletefile.sh b/deletefile.sh deleted file mode 100644 index 4a651d4..0000000 --- a/deletefile.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -for i in `seq 1 6` -do - rm -rf /tmp/$i/objects - rm -rf /tmp/$i/temp -done \ No newline at end of file diff --git a/go.mod b/go.mod deleted file mode 100644 index 20397be..0000000 --- a/go.mod +++ /dev/null @@ -1,9 +0,0 @@ -module ObjectStorage - -go 1.14 - -require ( - github.com/klauspost/reedsolomon v1.9.9 - github.com/mmcloughlin/avo v0.0.0-20201130012700-45c8ae10fd12 // indirect - github.com/streadway/amqp v1.0.0 -) diff --git a/go.sum b/go.sum deleted file mode 100644 index b5c4a63..0000000 --- a/go.sum +++ /dev/null @@ -1,34 +0,0 @@ -github.com/klauspost/cpuid v1.2.4 h1:EBfaK0SWSwk+fgk6efYFWdzl8MwRWoOO1gkmiaTXPW4= -github.com/klauspost/cpuid v1.2.4/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= -github.com/klauspost/reedsolomon v1.9.9 h1:qCL7LZlv17xMixl55nq2/Oa1Y86nfO8EqDfv2GHND54= -github.com/klauspost/reedsolomon v1.9.9/go.mod h1:O7yFFHiQwDR6b2t63KPUpccPtNdp5ADgh1gg4fd12wo= -github.com/mmcloughlin/avo v0.0.0-20201130012700-45c8ae10fd12 h1:JJvkIBIdkzz71+2UD6CHfjDC2O3fCZJ98KUaB70gr00= -github.com/mmcloughlin/avo v0.0.0-20201130012700-45c8ae10fd12/go.mod h1:6aKT4zZIrpGqB3RpFU14ByCSSyKY6LfJz4J/JJChHfI= -github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= -github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= -github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -golang.org/x/arch v0.0.0-20201008161808-52c3e6f60cff/go.mod h1:flIaEI6LNU6xOCD5PaJvn9wGP0agmIOqjrtsKGRguv4= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= -golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174 h1:0rx0F4EjJNbxTuzWe0KjKcIzs+3VEb/Mrs/d1ciNz1c= -golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/hash b/hash deleted file mode 100644 index 0632cf5..0000000 --- a/hash +++ /dev/null @@ -1 +0,0 @@ -b3BlbnNzbCBkZ3N0IC1zaGEyNTYgLWJpbmFyeSAvdG1wL2ZpbGUK diff --git a/initenv.sh b/initenv.sh deleted file mode 100644 index 1d38faa..0000000 --- a/initenv.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash - -for i in `seq 1 6` -do - mkdir -p /tmp/$i/objects - mkdir -p /tmp/$i/temp - mkdir -p /tmp/$i/garbage -done - -A -A -A -A -A -A -A -done diff --git a/repairTools/deleteOldMetadata/deleteOldMetadata.go b/repairTools/deleteOldMetadata/deleteOldMetadata.go deleted file mode 100644 index 87083c6..0000000 --- a/repairTools/deleteOldMetadata/deleteOldMetadata.go +++ /dev/null @@ -1,23 +0,0 @@ -package main - -import ( - "ObjectStorage/src/lib/es" - "log" -) - -const MIN_VERSION_COUNT = 5 - -func main() { - buckets, e := es.SearchVersionStatus(MIN_VERSION_COUNT + 1) - if e != nil { - log.Println(e) - return - } - log.Println(len(buckets)) - for i := range buckets { - bucket := buckets[i] - for v := 0; v < bucket.Doc_count-MIN_VERSION_COUNT; v++ { - es.DelMetadata(bucket.Key, v+int(bucket.Min_version.Value)) - } - } -} diff --git a/repairTools/deleteOrphanObject/deleteOrphanObject.go b/repairTools/deleteOrphanObject/deleteOrphanObject.go deleted file mode 100644 index a63e094..0000000 --- a/repairTools/deleteOrphanObject/deleteOrphanObject.go +++ /dev/null @@ -1,34 +0,0 @@ -package main - -import ( - "ObjectStorage/src/lib/es" - "log" - "net/http" - "os" - "path/filepath" - "strings" -) - -func main() { - files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*") - - for i := range files { - hash := strings.Split(filepath.Base(files[i]), ".")[0] - hashInMetadata, e := es.HasHash(hash) - if e != nil { - log.Println(e) - return - } - if !hashInMetadata { - del(hash) - } - } -} - -func del(hash string) { - log.Println("delete", hash) - url := "http://" + os.Getenv("LISTEN_ADDRESS") + "/objects/" + hash - request, _ := http.NewRequest("DELETE", url, nil) - client := http.Client{} - client.Do(request) -} diff --git a/repairTools/objectScanner/objectScanner.go b/repairTools/objectScanner/objectScanner.go deleted file mode 100644 index 62d2aa6..0000000 --- a/repairTools/objectScanner/objectScanner.go +++ /dev/null @@ -1,39 +0,0 @@ -package main - -import ( - "ObjectStorage/apiServer/object" - "ObjectStorage/src/lib/es" - "ObjectStorage/src/lib/utils" - "log" - "os" - "path/filepath" - "strings" -) - -func main() { - files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*") - - for i := range files { - hash := strings.Split(filepath.Base(files[i]), ".")[0] - verify(hash) - } -} - -func verify(hash string) { - log.Println("verify", hash) - size, e := es.SearchHashSize(hash) - if e != nil { - log.Println(e) - return - } - stream, e := object.GetStream(hash, size) - if e != nil { - log.Println(e) - return - } - d := utils.CalculateHash(stream) - if d != hash { - log.Printf("object hash mismatch, calculated=%s, requested=%s", d, hash) - } - stream.Close() -} diff --git a/src/lib/es/es.go b/src/lib/es/es.go deleted file mode 100644 index 23cad6a..0000000 --- a/src/lib/es/es.go +++ /dev/null @@ -1,209 +0,0 @@ -package es - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "os" - "strings" -) - -type Metadata struct { - Name string `json:"name"` - Version int `json:"version"` - Size int64 `json:"size"` - Hash string `json:"hash"` -} - -type Bucket struct { - Key string - Doc_count int - Min_version struct { - Value float32 - } -} - -type aggregateResult struct { - Aggregations struct { - Group_by_name struct { - Buckets []Bucket - } - } -} - -func getMetadata(name string, versionId int) (meta Metadata, err error) { - url := fmt.Sprintf("http://%s/metadata/objects/%s_%d/_source", os.Getenv("ES_SERVER"), name, versionId) - r, err := http.Get(url) - if err != nil { - return - } - if r.StatusCode != http.StatusOK { - err = fmt.Errorf("failed to get %s_%d", name, versionId) - return - } - //获取到文件元数据 - result, _ := ioutil.ReadAll(r.Body) - json.Unmarshal(result, &meta) - return -} - -type hit struct { - Source Metadata `json:"_source"` -} -type searchResult struct { - Hits struct { - Total int - Hits []hit - } -} - -func SearchLatestVersion(name string) (meta Metadata, err error) { - url := fmt.Sprintf("http://%s/metadata/_search?q=name:%s&size=1&sort=version:desc", os.Getenv("ES_SERVER"), name) - r, err := http.Get(url) - if err != nil { - return - } - if r.StatusCode != http.StatusOK { - err = fmt.Errorf("fialed to find %s latest version", name) - return - } - result, _ := ioutil.ReadAll(r.Body) - var sr searchResult - json.Unmarshal(result, &sr) - if len(sr.Hits.Hits) != 0 { - meta = sr.Hits.Hits[0].Source - - } - return -} -func GetMetadata(name string, version int) (meta Metadata, err error) { - if version == 0 { - return SearchLatestVersion(name) - } - return getMetadata(name, version) -} - -func PutMetadata(name string, version int, size int64, hash string) error { - doc := fmt.Sprintf(`{"name":"%s","version":%d,"size":%d,"hash":"%s"}`, name, version, size, hash) - client := http.Client{} - url := fmt.Sprintf("http://%s/metadata/objects/%s_%d?op_type=create", os.Getenv("ES_SERVER"), name, version) - request, _ := http.NewRequest("PUT", url, strings.NewReader(doc)) - request.Header.Add("Content-Type", "application/json") - request.Header.Add("datatype", "json") - r, err := client.Do(request) - if err != nil { - return err - } - //如果多个客户端同时上传一个元数据,只有一个成功,其他的会增加版本号继续递归上传,直到成功 - if r.StatusCode == http.StatusConflict { - return PutMetadata(name, version+1, size, hash) - } - if r.StatusCode != http.StatusCreated { - result, _ := ioutil.ReadAll(r.Body) - return fmt.Errorf("failed to put metadata : %d %s", r.StatusCode, string(result)) - } - return nil -} - -func AddVersion(name, hash string, size int64) error { - version, e := SearchLatestVersion(name) - if e != nil { - return e - } - return PutMetadata(name, version.Version+1, size, hash) -} - -func SearchAllVersions(name string, from, size int) ([]Metadata, error) { - //构造请求url - url := fmt.Sprintf("http://%s/metadata/objects/_search?sort=name,version&from=%d&size=%d", os.Getenv("ES_SERVER"), from, size) - if name != "" { - url += "&q=name:" + name - } - fmt.Println(url) - r, err := http.Get(url) - if err != nil { - return nil, err - } - metas := make([]Metadata, 0) - result, _ := ioutil.ReadAll(r.Body) - var sr searchResult - json.Unmarshal(result, &sr) - for i := range sr.Hits.Hits { - metas = append(metas, sr.Hits.Hits[i].Source) - } - return metas, nil -} - -func SearchVersionStatus(min_doc_count int) ([]Bucket, error) { - client := http.Client{} - url := fmt.Sprintf("http://%s/metadata/_search", os.Getenv("ES_SERVER")) - body := fmt.Sprintf(` - { - "size": 0, - "aggs": { - "group_by_name": { - "terms": { - "field": "name", - "min_doc_count": "%d" - }, - "aggs": { - "min_version": { - "min": { - "field": "version" - } - } - } - } - } - }`, min_doc_count) - request, _ := http.NewRequest("GET", url, strings.NewReader(body)) - r, e := client.Do(request) - if e != nil { - return nil, e - } - b, _ := ioutil.ReadAll(r.Body) - var ar aggregateResult - json.Unmarshal(b, &ar) - return ar.Aggregations.Group_by_name.Buckets, nil -} - -func DelMetadata(name string, version int) { - client := http.Client{} - url := fmt.Sprintf("http://%s/metadata/objects/%s_%d", - os.Getenv("ES_SERVER"), name, version) - request, _ := http.NewRequest("DELETE", url, nil) - client.Do(request) -} - -func HasHash(hash string) (bool, error) { - url := fmt.Sprintf("http://%s/metadata/_search?q=hash:%s&size=0", os.Getenv("ES_SERVER"), hash) - r, e := http.Get(url) - if e != nil { - return false, e - } - b, _ := ioutil.ReadAll(r.Body) - var sr searchResult - json.Unmarshal(b, &sr) - return sr.Hits.Total != 0, nil -} - -func SearchHashSize(hash string) (size int64, e error) { - url := fmt.Sprintf("http://%s/metadata/_search?q=hash:%s&size=1", - os.Getenv("ES_SERVER"), hash) - r, e := http.Get(url) - if e != nil { - return - } - if r.StatusCode != http.StatusOK { - e = fmt.Errorf("fail to search hash size: %d", r.StatusCode) - return - } - result, _ := ioutil.ReadAll(r.Body) - var sr searchResult - json.Unmarshal(result, &sr) - if len(sr.Hits.Hits) != 0 { - size = sr.Hits.Hits[0].Source.Size - } - return -} diff --git a/src/lib/objectstream/get.go b/src/lib/objectstream/get.go deleted file mode 100644 index 14ccc37..0000000 --- a/src/lib/objectstream/get.go +++ /dev/null @@ -1,32 +0,0 @@ -package objectstream - -import ( - "fmt" - "io" - "net/http" -) - -type GetStream struct { - reader io.Reader -} - -func (r *GetStream) Read(p []byte) (n int, err error) { - return r.reader.Read(p) -} -func newGetStream(url string) (*GetStream, error) { - r, err := http.Get(url) - if err != nil { - return nil, err - } - if r.StatusCode != http.StatusOK { - return nil, fmt.Errorf("dataServer return http code %d", r.StatusCode) - } - return &GetStream{r.Body}, nil -} - -func NewGetStream(server string, object string) (*GetStream, error) { - if server == "" || object == "" { - return nil, fmt.Errorf("invalid server %s object %s", server, object) - } - return newGetStream("http://" + server + "/objects/" + object) -} diff --git a/src/lib/objectstream/put.go b/src/lib/objectstream/put.go deleted file mode 100644 index ef8ab50..0000000 --- a/src/lib/objectstream/put.go +++ /dev/null @@ -1,37 +0,0 @@ -package objectstream - -import ( - "fmt" - "io" - "net/http" -) - -type PutStream struct { - writer *io.PipeWriter - c chan error -} - -func (w *PutStream) Write(p []byte) (n int, err error) { - return w.writer.Write(p) -} -func (w *PutStream) Close() error { - w.writer.Close() - //close执行后http cleint请求会发送出去 - //将HTTPclient doRequest结果返回 - return <-w.c -} - -func NewPutStream(server, object string) *PutStream { - reader, writer := io.Pipe() - c := make(chan error) - go func() { - request, _ := http.NewRequest("PUT", "http://"+server+"/objects/"+object, reader) - client := http.Client{} - r, e := client.Do(request) - if e == nil && r.StatusCode != http.StatusOK { - e = fmt.Errorf("dataServer return http code %d", r.StatusCode) - } - c <- e - }() - return &PutStream{writer, c} -} diff --git a/src/lib/objectstream/temp.go b/src/lib/objectstream/temp.go deleted file mode 100644 index 8bf4a2e..0000000 --- a/src/lib/objectstream/temp.go +++ /dev/null @@ -1,63 +0,0 @@ -package objectstream - -import ( - "fmt" - "io/ioutil" - "log" - "net/http" - "strings" -) - -type TempPutStream struct { - Server string - Uuid string -} - -func NewTempPutStream(server, object string, size int64) (*TempPutStream, error) { - request, e := http.NewRequest("POST", "http://"+server+"/temp/"+object, nil) - if e != nil { - return nil, e - } - request.Header.Set("size", fmt.Sprintf("%d", size)) - client := http.Client{} - response, e := client.Do(request) - if e != nil { - return nil, e - } - uuid, e := ioutil.ReadAll(response.Body) - if e != nil { - return nil, e - } - return &TempPutStream{server, string(uuid)}, nil -} - -func (w *TempPutStream) Write(p []byte) (n int, err error) { - request, e := http.NewRequest("PATCH", "http://"+w.Server+"/temp/"+w.Uuid, strings.NewReader(string(p))) - if e != nil { - return 0, e - } - client := http.Client{} - r, e := client.Do(request) - if e != nil { - return 0, e - } - if r.StatusCode != http.StatusOK { - return 0, fmt.Errorf("dataServer return http code %d", r.StatusCode) - } - return len(p), nil -} - -func (w *TempPutStream) Commit(good bool) { - log.Println("commit status:", good) - method := http.MethodDelete - if good { - method = http.MethodPut - } - request, _ := http.NewRequest(method, "http://"+w.Server+"/temp/"+w.Uuid, nil) - client := http.Client{} - client.Do(request) -} - -func NewTempGetStream(server, uuid string) (*GetStream, error) { - return newGetStream("http://" + server + "/temp/" + uuid) -} diff --git a/src/lib/rabbitmq/rabbitmq.go b/src/lib/rabbitmq/rabbitmq.go deleted file mode 100644 index 238fd4e..0000000 --- a/src/lib/rabbitmq/rabbitmq.go +++ /dev/null @@ -1,112 +0,0 @@ -package rabbitmq - -import ( - "encoding/json" - "github.com/streadway/amqp" -) - -type RabbitMQ struct { - channel *amqp.Channel - conn *amqp.Connection - Name string - exchange string -} - -func New(s string) *RabbitMQ { - conn, e := amqp.Dial(s) - if e != nil { - panic(e) - } - - ch, e := conn.Channel() - if e != nil { - panic(e) - } - - q, e := ch.QueueDeclare( - "", // name - false, // durable - true, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if e != nil { - panic(e) - } - - mq := new(RabbitMQ) - mq.channel = ch - mq.conn = conn - mq.Name = q.Name - return mq -} - -func (q *RabbitMQ) Bind(exchange string) { - e := q.channel.QueueBind( - q.Name, // queue name - "", // routing key - exchange, // exchange - false, - nil) - if e != nil { - panic(e) - } - q.exchange = exchange -} - -func (q *RabbitMQ) Send(queue string, body interface{}) { - str, e := json.Marshal(body) - if e != nil { - panic(e) - } - e = q.channel.Publish("", - queue, - false, - false, - amqp.Publishing{ - ReplyTo: q.Name, - Body: []byte(str), - }) - if e != nil { - panic(e) - } -} - -func (q *RabbitMQ) Publish(exchange string, body interface{}) { - str, e := json.Marshal(body) - if e != nil { - panic(e) - } - e = q.channel.Publish(exchange, - "", - false, - false, - amqp.Publishing{ - ReplyTo: q.Name, - Body: []byte(str), - }) - if e != nil { - panic(e) - } -} - -func (q *RabbitMQ) Consume() <-chan amqp.Delivery { - c, e := q.channel.Consume(q.Name, - "", - true, - false, - false, - false, - nil, - ) - if e != nil { - panic(e) - } - return c -} - -func (q *RabbitMQ) Close() { - q.channel.Close() - q.conn.Close() -} diff --git a/src/lib/rabbitmq/rabbitmq_test.go b/src/lib/rabbitmq/rabbitmq_test.go deleted file mode 100644 index e438f45..0000000 --- a/src/lib/rabbitmq/rabbitmq_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package rabbitmq - -import ( - "encoding/json" - "testing" -) - -const host = "amqp://test:test@10.29.102.173:5672" - -func TestPublish(t *testing.T) { - q := New(host) - defer q.Close() - q.Bind("test") - - q2 := New(host) - defer q2.Close() - q2.Bind("test") - - q3 := New(host) - defer q3.Close() - - expect := "test" - q3.Publish("test2", "any") - q3.Publish("test", expect) - - c := q.Consume() - msg := <-c - var actual interface{} - err := json.Unmarshal(msg.Body, &actual) - if err != nil { - t.Error(err) - } - if actual != expect { - t.Errorf("expected %s, actual %s", expect, actual) - } - if msg.ReplyTo != q3.Name { - t.Error(msg) - } - - c2 := q2.Consume() - msg = <-c2 - err = json.Unmarshal(msg.Body, &actual) - if err != nil { - t.Error(err) - } - if actual != expect { - t.Errorf("expected %s, actual %s", expect, actual) - } - if msg.ReplyTo != q3.Name { - t.Error(msg) - } - q2.Send(msg.ReplyTo, "test3") - c3 := q3.Consume() - msg = <-c3 - if string(msg.Body) != `"test3"` { - t.Error(string(msg.Body)) - } -} - -func TestSend(t *testing.T) { - q := New(host) - defer q.Close() - - q2 := New(host) - defer q2.Close() - - expect := "test" - expect2 := "test2" - q2.Send(q.Name, expect) - q2.Send(q2.Name, expect2) - - c := q.Consume() - msg := <-c - var actual interface{} - err := json.Unmarshal(msg.Body, &actual) - if err != nil { - t.Error(err) - } - if actual != expect { - t.Errorf("expected %s, actual %s", expect, actual) - } - - c2 := q2.Consume() - msg = <-c2 - err = json.Unmarshal(msg.Body, &actual) - if err != nil { - t.Error(err) - } - if actual != expect2 { - t.Errorf("expected %s, actual %s", expect2, actual) - } -} diff --git a/src/lib/rs/common.go b/src/lib/rs/common.go deleted file mode 100644 index d417c07..0000000 --- a/src/lib/rs/common.go +++ /dev/null @@ -1,9 +0,0 @@ -package rs - -const ( - DATA_SHARDS = 4 - PARITY_SHARDS = 2 - ALL_SHARDS = DATA_SHARDS + PARITY_SHARDS - BLOCK_PER_SHARD = 8000 - BLOCK_SIZE = BLOCK_PER_SHARD * DATA_SHARDS -) diff --git a/src/lib/rs/decoder.go b/src/lib/rs/decoder.go deleted file mode 100644 index bb33878..0000000 --- a/src/lib/rs/decoder.go +++ /dev/null @@ -1,84 +0,0 @@ -package rs - -import ( - "ObjectStorage/src/lib/objectstream" - "github.com/klauspost/reedsolomon" - "io" - "log" -) - -type decoder struct { - readers []io.Reader - writers []io.Writer - enc reedsolomon.Encoder - size int64 - cache []byte - cacheSize int - total int64 -} - -func NewDecoder(readers []io.Reader, writers []io.Writer, size int64) *decoder { - enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS) - return &decoder{readers, writers, enc, size, nil, 0, 0} -} - -func (d *decoder) Read(p []byte) (n int, err error) { - if d.cacheSize == 0 { - e := d.getData() - if e != nil { - return 0, e - } - } - length := len(p) - if d.cacheSize < length { - length = d.cacheSize - } - d.cacheSize -= length - copy(p, d.cache[:length]) - d.cache = d.cache[length:] - return length, nil -} - -func (d *decoder) getData() error { - if d.total == d.size { - return io.EOF - } - shards := make([][]byte, ALL_SHARDS) - repairIds := make([]int, 0) - for i := range shards { - if d.readers[i] == nil { - repairIds = append(repairIds, i) - } else { - shards[i] = make([]byte, BLOCK_PER_SHARD) - n, e := io.ReadFull(d.readers[i], shards[i]) - if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF { - shards[i] = nil - } else if n != BLOCK_PER_SHARD { - shards[i] = shards[i][:n] - } - } - } - e := d.enc.Reconstruct(shards) - if e != nil { - return e - } - for i := range repairIds { - id := repairIds[i] - _, err := d.writers[id].Write(shards[id]) - if err != nil { - d.writers[id].(*objectstream.TempPutStream).Commit(false) - log.Println(err) - } - d.writers[id].(*objectstream.TempPutStream).Commit(true) - } - for i := 0; i < DATA_SHARDS; i++ { - shardSize := int64(len(shards[i])) - if d.total+shardSize > d.size { - shardSize -= d.total + shardSize - d.size - } - d.cache = append(d.cache, shards[i][:shardSize]...) - d.cacheSize += int(shardSize) - d.total += shardSize - } - return nil -} diff --git a/src/lib/rs/encoder.go b/src/lib/rs/encoder.go deleted file mode 100644 index 7eb8f4c..0000000 --- a/src/lib/rs/encoder.go +++ /dev/null @@ -1,47 +0,0 @@ -package rs - -import ( - "github.com/klauspost/reedsolomon" - "io" -) - -type encoder struct { - writers []io.Writer - enc reedsolomon.Encoder - cache []byte -} - -func NewEncoder(writers []io.Writer) *encoder { - enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS) - return &encoder{writers, enc, nil} -} - -func (e *encoder) Write(p []byte) (n int, err error) { - length := len(p) - current := 0 - for length != 0 { - next := BLOCK_SIZE - len(e.cache) - if next > length { - next = length - } - e.cache = append(e.cache, p[current:current+next]...) - if len(e.cache) == BLOCK_SIZE { - e.Flush() - } - current += next - length -= next - } - return len(p), nil -} - -func (e *encoder) Flush() { - if len(e.cache) == 0 { - return - } - shards, _ := e.enc.Split(e.cache) - e.enc.Encode(shards) - for i := range shards { - e.writers[i].Write(shards[i]) - } - e.cache = []byte{} -} diff --git a/src/lib/rs/get.go b/src/lib/rs/get.go deleted file mode 100644 index a628d1e..0000000 --- a/src/lib/rs/get.go +++ /dev/null @@ -1,74 +0,0 @@ -package rs - -import ( - "ObjectStorage/src/lib/objectstream" - "fmt" - "io" -) - -type RSGetStream struct { - *decoder -} - -//get every shards -func NewRSGetStream(locateInfo map[int]string, dataServers []string, hash string, size int64) (*RSGetStream, error) { - if len(locateInfo)+len(dataServers) != ALL_SHARDS { - return nil, fmt.Errorf("dataServers number is not match!") - } - readers := make([]io.Reader, ALL_SHARDS) - for i := 0; i < ALL_SHARDS; i++ { - server := locateInfo[i] - if server == "" { - locateInfo[i] = dataServers[0] - dataServers = dataServers[1:] - continue - } - //if server is not empty,read data from the server - reader, err := objectstream.NewGetStream(locateInfo[i], fmt.Sprintf("%s.%d", hash, i)) - if err == nil { - //get data shard i success - readers[i] = reader - } - } - writers := make([]io.Writer, ALL_SHARDS) - perShards := (size + DATA_SHARDS - 1) / DATA_SHARDS - var e error - for i := range readers { - if readers[i] == nil { - writers[i], e = objectstream.NewTempPutStream(locateInfo[i], fmt.Sprintf("%s.%d", hash, i), perShards) - if e != nil { - return nil, e - } - - } - } - dec := NewDecoder(readers, writers, size) - return &RSGetStream{dec}, nil -} - -func (s *RSGetStream) Close() { - for i := range s.writers { - if s.writers[i] != nil { - s.writers[i].(*objectstream.TempPutStream).Commit(true) - } - } -} - -func (s *RSGetStream) Seek(offset int64, whence int64) (int64, error) { - if whence != io.SeekCurrent { - panic("only support seekcurrent!") - } - if offset < 0 { - panic("only support forward seek") - } - length := int64(BLOCK_SIZE) - for offset != 0 { - if offset > length { - offset = length - } - buf := make([]byte, length) - io.ReadFull(s, buf) - offset -= length - } - return offset, nil -} diff --git a/src/lib/rs/put.go b/src/lib/rs/put.go deleted file mode 100644 index aa0bfb3..0000000 --- a/src/lib/rs/put.go +++ /dev/null @@ -1,38 +0,0 @@ -package rs - -import ( - "ObjectStorage/src/lib/objectstream" - "fmt" - "io" -) - -type RSPutStream struct { - *encoder -} - -func NewRSPutStream(dataServers []string, hash string, size int64) (*RSPutStream, error) { - if len(dataServers) != ALL_SHARDS { - return nil, fmt.Errorf("dataServers number mismatch") - } - - perShard := (size + DATA_SHARDS - 1) / DATA_SHARDS - writers := make([]io.Writer, ALL_SHARDS) - var e error - for i := range writers { - writers[i], e = objectstream.NewTempPutStream(dataServers[i], - fmt.Sprintf("%s.%d", hash, i), perShard) - if e != nil { - return nil, e - } - } - enc := NewEncoder(writers) - - return &RSPutStream{enc}, nil -} - -func (s *RSPutStream) Commit(success bool) { - s.Flush() - for i := range s.writers { - s.writers[i].(*objectstream.TempPutStream).Commit(success) - } -} diff --git a/src/lib/rs/resumable_get.go b/src/lib/rs/resumable_get.go deleted file mode 100644 index efa5276..0000000 --- a/src/lib/rs/resumable_get.go +++ /dev/null @@ -1,24 +0,0 @@ -package rs - -import ( - "ObjectStorage/src/lib/objectstream" - "io" -) - -type RSResumableGetStream struct { - *decoder -} - -func NewRSResumableGetStream(dataServers []string, uuids []string, size int64) (*RSResumableGetStream, error) { - readers := make([]io.Reader, ALL_SHARDS) - var e error - for i := 0; i < ALL_SHARDS; i++ { - readers[i], e = objectstream.NewTempGetStream(dataServers[i], uuids[i]) - if e != nil { - return nil, e - } - } - writers := make([]io.Writer, ALL_SHARDS) - dec := NewDecoder(readers, writers, size) - return &RSResumableGetStream{dec}, nil -} diff --git a/src/lib/rs/resumable_put.go b/src/lib/rs/resumable_put.go deleted file mode 100644 index d33f75b..0000000 --- a/src/lib/rs/resumable_put.go +++ /dev/null @@ -1,78 +0,0 @@ -package rs - -import ( - "ObjectStorage/src/lib/objectstream" - "ObjectStorage/src/lib/utils" - "encoding/base64" - "encoding/json" - "fmt" - "io" - "log" - "net/http" -) - -type resumableToken struct { - Name string - Size int64 - Hash string - Servers []string - Uuids []string -} - -type RSResumablePutStream struct { - *RSPutStream - *resumableToken -} - -func NewRSResumablePutStream(dataServers []string, name, hash string, size int64) (*RSResumablePutStream, error) { - putStream, e := NewRSPutStream(dataServers, hash, size) - if e != nil { - return nil, e - } - uuids := make([]string, ALL_SHARDS) - for i := range uuids { - uuids[i] = putStream.writers[i].(*objectstream.TempPutStream).Uuid - } - token := &resumableToken{name, size, hash, dataServers, uuids} - return &RSResumablePutStream{putStream, token}, nil -} - -func NewRSResumablePutStreamFromToken(token string) (*RSResumablePutStream, error) { - b, e := base64.StdEncoding.DecodeString(token) - if e != nil { - return nil, e - } - var t resumableToken - e = json.Unmarshal(b, &t) - if e != nil { - return nil, e - } - writers := make([]io.Writer, ALL_SHARDS) - for i := range writers { - writers[i] = &objectstream.TempPutStream{Server: t.Servers[i], Uuid: t.Uuids[i]} - } - enc := NewEncoder(writers) - return &RSResumablePutStream{&RSPutStream{enc}, &t}, nil -} - -func (s *RSResumablePutStream) ToToken() string { - b, _ := json.Marshal(s) - return base64.StdEncoding.EncodeToString(b) -} - -func (s *RSResumablePutStream) CurrentSize() int64 { - r, e := http.Head(fmt.Sprintf("http://%s/temp/%s", s.Servers[0], s.Uuids[0])) - if e != nil { - log.Println(e) - return -1 - } - if r.StatusCode != http.StatusOK { - log.Println(r.StatusCode) - return -1 - } - size := utils.GetSizeFromHeader(r.Header) * DATA_SHARDS - if size > s.Size { - size = s.Size - } - return size -} diff --git a/src/lib/types/types.go b/src/lib/types/types.go deleted file mode 100644 index b89c9ba..0000000 --- a/src/lib/types/types.go +++ /dev/null @@ -1,6 +0,0 @@ -package types - -type LocateMessage struct { - Addr string - Id int -} diff --git a/src/lib/utils/utils.go b/src/lib/utils/utils.go deleted file mode 100644 index 4342a37..0000000 --- a/src/lib/utils/utils.go +++ /dev/null @@ -1,45 +0,0 @@ -package utils - -import ( - "crypto/sha256" - "encoding/base64" - "io" - "net/http" - "strconv" - "strings" -) - -func GetHashFromHeader(h http.Header) string { - digest := h.Get("digest") - if len(digest) < 9 { - return "" - } - if digest[:8] != "SHA-256=" { - return "" - } - return digest[8:] -} - -func GetSizeFromHeader(h http.Header) int64 { - size, _ := strconv.ParseInt(h.Get("content-length"), 0, 64) - return size -} - -func CalculateHash(r io.Reader) string { - h := sha256.New() - io.Copy(h, r) - return base64.StdEncoding.EncodeToString(h.Sum(nil)) -} - -func GetOffsetFromHeader(h http.Header) int64 { - byteRange := h.Get("range") - if len(byteRange) < 7 { - return 0 - } - if byteRange[0:6] != "bytes=" { - return 0 - } - bytePos := strings.Split(byteRange[6:], "-") - offset, _ := strconv.ParseInt(bytePos[0], 0, 64) - return offset -} diff --git a/starttestenv.sh b/starttestenv.sh deleted file mode 100644 index b2ea801..0000000 --- a/starttestenv.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash -export RABBITMQ_SERVER=amqp://test:test@localhost:5672 -export ES_SERVER=localhost:9200 -LISTEN_ADDRESS=192.168.246.131:12341 STORAGE_ROOT=/tmp/1 go run ./dataServer/dataServer.go & -LISTEN_ADDRESS=192.168.246.131:12342 STORAGE_ROOT=/tmp/2 go run ./dataServer/dataServer.go & -LISTEN_ADDRESS=192.168.246.131:12343 STORAGE_ROOT=/tmp/3 go run ./dataServer/dataServer.go & -LISTEN_ADDRESS=192.168.246.131:12344 STORAGE_ROOT=/tmp/4 go run ./dataServer/dataServer.go & -LISTEN_ADDRESS=192.168.246.131:12345 STORAGE_ROOT=/tmp/5 go run ./dataServer/dataServer.go & -LISTEN_ADDRESS=192.168.246.131:12346 STORAGE_ROOT=/tmp/6 go run ./dataServer/dataServer.go & - -LISTEN_ADDRESS=192.168.246.131:12347 go run ./apiServer/apiServer.go & -LISTEN_ADDRESS=192.168.246.131:12348 go run ./apiServer/apiServer.go & diff --git a/stoptestenv.sh b/stoptestenv.sh deleted file mode 100755 index 4a131b3..0000000 --- a/stoptestenv.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash - -killall apiServer -killall dataServer diff --git a/test/put.sh b/test/put.sh deleted file mode 100755 index 09a3275..0000000 --- a/test/put.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/bash - -curl -v 192.168.246.129:12347/objects/test3 -XPUT -d"this is object test3" - -echo -n "this is object test3" | openssl dgst -sha256 -binary | base64 -curl -v 192.168.246.129:12347/objects/test3 -XPUT -d"this is object test3" -H "Digest: SHA-256=GYqqAdFPt+CScnUDc0/Gcu3kwcWmOADKNYpiZtdbgsM=" - -curl 192.168.246.129:12347/objects/test3 -echo - -echo -n "this is object test3 version 2" | openssl dgst -sha256 -binary | base64 -curl -v 192.168.246.129:12347/objects/test3 -XPUT -d"this is object test3 version 2" -H "Digest: SHA-256=cAPvsxZe1PR54zIESQy0BaxC1pYJIvaHSF3qEOZYYIo=" - -curl 192.168.246.129:12347/objects/test3 -echo - -curl 192.168.246.129:12347/objects/test3 -echo -curl 192.168.246.129:12347/locate/GYqqAdFPt+CScnUDc0%2FGcu3kwcWmOADKNYpiZtdbgsM= -echo -curl 192.168.246.129:12347/locate/cAPvsxZe1PR54zIESQy0BaxC1pYJIvaHSF3qEOZYYIo= -echo -curl 192.168.246.129:12347/versions/test3 -echo -curl 192.168.246.129:12347/objects/test3?version=1 -echo -curl -v 192.168.246.129:12347/objects/test3 -XDELETE - -curl -v 192.168.246.129:12347/objects/test3 -echo - -curl 192.168.246.129:12347/versions/test3 -echo -curl 192.168.246.129:12347/objects/test3?version=1 -echo -curl 192.168.246.129:12347/objects/test3?version=2 -echo diff --git a/test/putmapping.sh b/test/putmapping.sh deleted file mode 100755 index 67aad5d..0000000 --- a/test/putmapping.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - - -#curl "localhost:9200/metadata" -XDELETE - -curl -H "Content-Type: application/json" "localhost:9200/metadata" -XPUT -d'{"mappings":{"objects":{"properties":{"name":{"type":"text","index":"true","fielddata": "true"},"version":{"type":"integer"},"size":{"type":"integer"},"hash":{"type":"text"}}}}}' - diff --git a/test/test4.sh b/test/test4.sh deleted file mode 100644 index c27a6df..0000000 --- a/test/test4.sh +++ /dev/null @@ -1,19 +0,0 @@ -echo -n "this object will have only 1 instance" | openssl dgst -sha256 -binary | base64 - -curl -v 192.168.246.130:12347/objects/test4_1 -XPUT -d "this object will have only 1 instance" -H "Digest: SHA-256=incorrecthash" - -curl -v 192.168.246.130:12347/objects/test4_1 -XPUT -d "this object will have only 1 instance" -H "Digest: SHA-256=aWKQ2BipX94sb+h3xdTbWYAu1yzjn5vyFG2SOwUQIXY=" - -curl -v 192.168.246.130:12347/objects/test4_2 -XPUT -d "this object will have only 1 instance" -H "Digest: SHA-256=aWKQ2BipX94sb+h3xdTbWYAu1yzjn5vyFG2SOwUQIXY=" - -ls -ltr /tmp/?/objects -echo -curl 192.168.246.130:12347/objects/test4_1 -echo -curl 192.168.246.130:12347/objects/test4_2 -echo -curl 192.168.246.130:12347/locate/aWKQ2BipX94sb+h3xdTbWYAu1yzjn5vyFG2SOwUQIXY= -echo -curl 192.168.246.130:12347/versions/test4_1 -echo -curl 192.168.246.130:12347/versions/test4_2 \ No newline at end of file diff --git a/test/test6.sh b/test/test6.sh deleted file mode 100644 index a688a44..0000000 --- a/test/test6.sh +++ /dev/null @@ -1,35 +0,0 @@ -#!/bin/bash - -dd if=/dev/urandom of=/tmp/file bs=1000 count=100 - -openssl dgst -sha256 -binary /tmp/file | base64 - -dd if=/tmp/file of=/tmp/first bs=1000 count=50 - -dd if=/tmp/file of=/tmp/second bs=1000 skip=32 count=68 - - - - -curl -v 192.168.246.131:12347/objects/test6 -XPOST -H "Digest: SHA-256=$1" -H "Size: 100000" - - -curl -I 192.168.246.131:12347/$1 - -curl -v -XPUT --data-binary @/tmp/first 192.168.246.131:12347/$1 - -curl -I 192.168.246.131:12347/$1 - -curl -v -XPUT --data-binary @/tmp/second -H "range: bytes=32000-" 192.168.246.131:12347/$1 - -curl -I 192.168.246.131:12347/$1 - -curl -v -XPUT --data-binary @/tmp/last -H "range: bytes=96000-" 192.168.246.131:12347/$1 - -curl 192.168.246.131:12347/objects/test6 > /tmp/output - -diff -s /tmp/output /tmp/file - -curl 192.168.246.131:12347/objects/test6 -H "range: bytes=32000-" > /tmp/output2 - -diff -s /tmp/output2 /tmp/second \ No newline at end of file diff --git a/test/test6/test6_1.sh b/test/test6/test6_1.sh deleted file mode 100644 index 9058632..0000000 --- a/test/test6/test6_1.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/bash - -dd if=/dev/urandom of=/tmp/file bs=1000 count=100 - -base = $(base64 <<< openssl dgst -sha256 -binary <<< ($cat /tmp/file)) - -echo $base -curl -v 192.168.246.131:12347/objects/test6 -XPOST -H "Digest: SHA-256=${hash}" -H "Size: 100000" \ No newline at end of file diff --git a/test/test7/genFile.sh b/test/test7/genFile.sh deleted file mode 100644 index 0ec9b12..0000000 --- a/test/test7/genFile.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -dd if=/dev/zero of=/tmp/file bs=1M count=100 - -openssl dgst -sha256 -binary /tmp/file | base64 \ No newline at end of file diff --git a/test/test7/put.sh b/test/test7/put.sh deleted file mode 100644 index 706c8f2..0000000 --- a/test/test7/put.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash - -curl -v 192.168.246.131:12347/objects/test7 -XPUT --data-binary @/tmp/file -H "Digest: SHA-256=IEkqTQ2E+L6xdn9mFiKfhdRMKCe2S9v7Jg7hL6EQng4=" - -curl -v 192.168.246.131:12347/objects/test7 -o /tmp/output - -diff -s /tmp/output /tmp/file - -ls -ltr /tmp/?/objects - -curl -v 192.168.246.131:12347/objects/test7 -H "Accept-Encoding: gzip" -o /tmp/output2.gz - -gunzip /tmp/output2.gz - -diff -s /tmp/output2 /tmp/file \ No newline at end of file diff --git a/test/test8/createGarbage.sh b/test/test8/createGarbage.sh deleted file mode 100644 index b55464d..0000000 --- a/test/test8/createGarbage.sh +++ /dev/null @@ -1,4 +0,0 @@ -for i in `seq 1 6` -do - mkdir -p /tmp/$i/garbage -done \ No newline at end of file diff --git a/test/test8/push.sh b/test/test8/push.sh deleted file mode 100644 index 17f20f3..0000000 --- a/test/test8/push.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/bash - -export RABBITMQ_SERVER=amqp://test:test@192.168.246.131:5672 -export ES_SERVER=192.168.246.131:9200 - -echo -n "this is object test8 version 1" | openssl dgst -sha256 -binary | base64 -curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 1" -H "Digest: SHA-256=2IJQkIth94IVsnPQMrsNxz1oqfrsPo0E2ZmZfJLDZnE=" - -echo -n "this is object test8 version 2-6" | openssl dgst -sha256 -binary | base64 -curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 2-6" -H "Digest: SHA-256=66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA=" -curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 2-6" -H "Digest: SHA-256=66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA=" -curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 2-6" -H "Digest: SHA-256=66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA=" -curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 2-6" -H "Digest: SHA-256=66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA=" -curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 2-6" -H "Digest: SHA-256=66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA=" - -curl 192.168.246.131:12347/versions/test8 -curl 192.168.246.131:12347/objects/test8 -ls -l /tmp/?/objects - -go run ../deleteOldMetadata/deleteOldMetadata.go -curl 192.168.246.131:12347/versions/test8 - -STORAGE_ROOT=/tmp/1 LISTEN_ADDRESS=10.29.1.1:12345 go run ../deleteOrphanObject/deleteOrphanObject.go -STORAGE_ROOT=/tmp/2 LISTEN_ADDRESS=10.29.1.2:12345 go run ../deleteOrphanObject/deleteOrphanObject.go -STORAGE_ROOT=/tmp/3 LISTEN_ADDRESS=10.29.1.3:12345 go run ../deleteOrphanObject/deleteOrphanObject.go -STORAGE_ROOT=/tmp/4 LISTEN_ADDRESS=10.29.1.4:12345 go run ../deleteOrphanObject/deleteOrphanObject.go -STORAGE_ROOT=/tmp/5 LISTEN_ADDRESS=10.29.1.5:12345 go run ../deleteOrphanObject/deleteOrphanObject.go -STORAGE_ROOT=/tmp/6 LISTEN_ADDRESS=10.29.1.6:12345 go run ../deleteOrphanObject/deleteOrphanObject.go -ls -l /tmp/?/objects -ls -l /tmp/?/garbage - -rm /tmp/1/objects/66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA=.* -echo some_garbage > /tmp/2/objects/66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA=.* -ls -l /tmp/?/objects - -STORAGE_ROOT=/tmp/2 go run ../objectScanner/objectScanner.go -ls -l /tmp/?/objects diff --git a/test/tset5.sh b/test/tset5.sh deleted file mode 100644 index 0756bb2..0000000 --- a/test/tset5.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash - -echo -n "this object will be separate to 4+2 shards" | openssl dgst -sha256 -binary | base64 - -curl -v 192.168.246.131:12347/objects/test5 -XPUT -d "this object will be separate to 4+2 shards" -H "Digest: SHA-256=MBMxWHrPMsuOBaVYHkwScZQRyTRMQyiKp2oelpLZza8=" - -ls -ltr /tmp/?/objects -echo -curl 192.168.246.131:12347/objects/test5 -echo -curl 192.168.246.131:12347/locate/MBMxWHrPMsuOBaVYHkwScZQRyTRMQyiKp2oelpLZza8= - -rm /tmp/1/objects/MBMxWHrPMsuOBaVYHkwScZQRyTRMQyiKp2oelpLZza8=.* -echo some_garbage > /tmp/2/objects/MBMxWHrPMsuOBaVYHkwScZQRyTRMQyiKp2oelpLZza8=.* -ls -ltr /tmp/?/objects -echo -curl 192.168.246.131:12347/objects/test5 -echo -ls -ltr /tmp/?/objects