diff --git a/.idea/.gitignore b/.idea/.gitignore
deleted file mode 100644
index 73f69e0..0000000
--- a/.idea/.gitignore
+++ /dev/null
@@ -1,8 +0,0 @@
-# Default ignored files
-/shelf/
-/workspace.xml
-# Datasource local storage ignored files
-/dataSources/
-/dataSources.local.xml
-# Editor-based HTTP Client requests
-/httpRequests/
diff --git a/.idea/CodeReadingNote.xml b/.idea/CodeReadingNote.xml
deleted file mode 100644
index 2a0133f..0000000
--- a/.idea/CodeReadingNote.xml
+++ /dev/null
@@ -1,7 +0,0 @@
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/ObjectStorage.iml b/.idea/ObjectStorage.iml
deleted file mode 100644
index c956989..0000000
--- a/.idea/ObjectStorage.iml
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
deleted file mode 100644
index 28a804d..0000000
--- a/.idea/misc.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
deleted file mode 100644
index 638b027..0000000
--- a/.idea/modules.xml
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
deleted file mode 100644
index 94a25f7..0000000
--- a/.idea/vcs.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
deleted file mode 100644
index c7a0bbe..0000000
--- a/.idea/workspace.xml
+++ /dev/null
@@ -1,317 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 1607694590815
-
-
- 1607694590815
-
-
- 1607778089100
-
-
-
- 1607778089100
-
-
- 1607858454909
-
-
-
- 1607858454909
-
-
- 1607929674883
-
-
-
- 1607929674883
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- true
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/README.md b/README.md
index df88038..fad261c 100644
--- a/README.md
+++ b/README.md
@@ -1,22 +1,2 @@
-# ObjectStorage 分布式对象存储项目开发
-
-## 简介
-项目主要分为apiServer端和dataServer端,apiServer向外提供restful接口,负责接受客户的连接请求。
-dataServer负责存储数据。rabbitmq用于apiServer和dataServer端之间心跳、文件定位等。Elasticearch用于存储元数据。
-
-## 主要特点
-### 数据校验和去重
-主要思想,计算文件的sha256哈希值,以哈希值代替文件名进行存储,元数据服务器存储的映射。客户上传数据时会将哈希作为http头部数据一起上传,服务器接受时会计算哈希,并对比。同时
-由于哈希的唯一性,重复数据只保留一份。
-### 数据冗余和即时修复
-用Reed Solomon纠删码进行冗余,每份数据分成若干份,每份存储在互不相同的结点上。读取的时候进行文件完整性检查和修复。
-修复
-
-### 断点续传
-支持断点下载和上传。下载时http头部包含偏移量。上传为了支持哈希校验,在上传完成后才开始校验。
-
-### gzip压缩
-节省空间,数据服务器存储存储压缩数据,下载时解压之后再发送出去。
-
-## 改进
-加入Nginx进行负载均衡
+# ObjectStorage
+分布式对象存储项目开发
diff --git a/apiServer/apiServer.go b/apiServer/apiServer.go
deleted file mode 100644
index 9dc93f5..0000000
--- a/apiServer/apiServer.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package main
-
-import (
- "ObjectStorage/apiServer/heartbeat"
- "ObjectStorage/apiServer/locate"
- "ObjectStorage/apiServer/object"
- "ObjectStorage/apiServer/temp"
- "ObjectStorage/apiServer/versions"
- "log"
- "net/http"
- "os"
-)
-
-func main() {
- go heartbeat.ListenHeartBeat()
- //相当于PUT上传数据
- http.HandleFunc("/objects/", object.Handler)
- //相当于GET请求数据
- http.HandleFunc("/locate/", locate.Handler)
- //Get版本所有信息,直接查询元数据服务器
- http.HandleFunc("/versions/", versions.Handler)
- //
- http.HandleFunc("/temp/", temp.Handler)
-
- log.Fatalln(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
-}
diff --git a/apiServer/heartbeat/choose.go b/apiServer/heartbeat/choose.go
deleted file mode 100644
index 9d50ec5..0000000
--- a/apiServer/heartbeat/choose.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package heartbeat
-
-import "math/rand"
-
-func ChooseRandomDataServers(n int, exclude map[int]string) (ds []string) {
- candidates := make([]string, 0)
- //exclude the server which has the data
- reverseExcludeServers := make(map[string]int)
- for id, addr := range exclude {
- reverseExcludeServers[addr] = id
- }
- //get current data servers that online
- servers := GetDataServers()
- for _, s := range servers {
- //judge the server exclude or include
- _, excluded := reverseExcludeServers[s]
- if !excluded {
- candidates = append(candidates, s)
- }
- }
- length := len(candidates)
- if length < n {
- return
- }
- p := rand.Perm(length)
- for i := 0; i < n; i++ {
- ds = append(ds, candidates[p[i]])
- }
- return
-}
diff --git a/apiServer/heartbeat/heartbeat.go b/apiServer/heartbeat/heartbeat.go
deleted file mode 100644
index 6087e7b..0000000
--- a/apiServer/heartbeat/heartbeat.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package heartbeat
-
-import (
- "ObjectStorage/src/lib/rabbitmq"
- "os"
- "strconv"
- "sync"
- "time"
-)
-
-var dataServers = make(map[string]time.Time)
-var mutex sync.Mutex
-
-func ListenHeartBeat() {
- q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
- defer q.Close()
- q.Bind("apiServers")
- go removeExpiredDataServers()
- c := q.Consume()
- for msg := range c {
- dataServer, err := strconv.Unquote(string(msg.Body))
- if err != nil {
- panic(err)
- }
- mutex.Lock()
- dataServers[dataServer] = time.Now()
- mutex.Unlock()
- }
-}
-
-func removeExpiredDataServers() {
- for {
- time.Sleep(5 * time.Second)
- mutex.Lock()
- for s, t := range dataServers {
- if t.Add(10 * time.Second).Before(time.Now()) {
- delete(dataServers, s)
- }
- }
- mutex.Unlock()
- }
-}
-func GetDataServers() []string {
- mutex.Lock()
- defer mutex.Unlock()
- servers := make([]string, 0)
- for s, _ := range dataServers {
- servers = append(servers, s)
- }
- return servers
-}
diff --git a/apiServer/locate/handler.go b/apiServer/locate/handler.go
deleted file mode 100644
index 1310b5d..0000000
--- a/apiServer/locate/handler.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package locate
-
-import (
- "encoding/json"
- "net/http"
- "strings"
-)
-
-func Handler(w http.ResponseWriter, r *http.Request) {
- m := r.Method
- if m != http.MethodGet {
- w.WriteHeader(http.StatusMethodNotAllowed)
- return
- }
- info := Locate(strings.Split(r.URL.EscapedPath(), "/")[2])
- if len(info) == 0 {
- w.WriteHeader(http.StatusNotFound)
- return
- }
- b, _ := json.Marshal(info)
- w.Write(b)
-}
diff --git a/apiServer/locate/locate.go b/apiServer/locate/locate.go
deleted file mode 100644
index efeffd5..0000000
--- a/apiServer/locate/locate.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package locate
-
-import (
- "ObjectStorage/src/lib/rabbitmq"
- "ObjectStorage/src/lib/rs"
- "ObjectStorage/src/lib/types"
- "encoding/json"
- "os"
- "time"
-)
-
-func Locate(name string) map[int]string {
- q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
- q.Publish("dataServers", name)
- c := q.Consume()
- go func() {
- time.Sleep(1 * time.Second)
- q.Close()
- }()
-
- locateInfo := make(map[int]string)
- for i := 0; i < rs.ALL_SHARDS; i++ {
- msg := <-c
- if len(msg.Body) == 0 {
- return locateInfo
- }
- var info types.LocateMessage
- json.Unmarshal(msg.Body, &info)
- locateInfo[info.Id] = info.Addr
- }
- return locateInfo
-}
-
-func Exist(name string) bool {
- return len(Locate(name)) >= rs.DATA_SHARDS
-}
diff --git a/apiServer/object/delete.go b/apiServer/object/delete.go
deleted file mode 100644
index 7ed6fb7..0000000
--- a/apiServer/object/delete.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package object
-
-import (
- "ObjectStorage/src/lib/es"
- "log"
- "net/http"
- "strings"
-)
-
-func del(w http.ResponseWriter, r *http.Request) {
- name := strings.Split(r.URL.EscapedPath(), "/")[2]
- //获取文件的最新元数据
- meta, err := es.SearchLatestVersion(name)
- if err != nil {
- log.Println(err)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- //获取元数据成功,添加新的元数据,标识此数据被删除
- err = es.PutMetadata(name, meta.Version+1, 0, "")
- if err != nil {
- log.Println(err)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
-
-}
diff --git a/apiServer/object/get.go b/apiServer/object/get.go
deleted file mode 100644
index 7582ff4..0000000
--- a/apiServer/object/get.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package object
-
-import (
- "ObjectStorage/src/lib/es"
- "ObjectStorage/src/lib/utils"
- "compress/gzip"
- "fmt"
- "io"
- "log"
- "net/http"
- "net/url"
- "strconv"
- "strings"
-)
-
-func get(w http.ResponseWriter, r *http.Request) {
- name := strings.Split(r.URL.EscapedPath(), "/")[2]
- versionId := r.URL.Query()["version"]
- version := 0
- var e error
- if len(versionId) != 0 {
- version, e = strconv.Atoi(versionId[0])
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- }
- meta, e := es.GetMetadata(name, version)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- if meta.Hash == "" {
- w.WriteHeader(http.StatusNotFound)
- return
- }
- hash := url.PathEscape(meta.Hash)
- stream, e := GetStream(hash, meta.Size)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusNotFound)
- return
- }
- offset := utils.GetOffsetFromHeader(r.Header)
- if offset != 0 {
- stream.Seek(offset, io.SeekCurrent)
- w.Header().Set("content-range", fmt.Sprintf("bytes %d-%d/%d", offset, meta.Size-1, meta.Size))
- w.WriteHeader(http.StatusPartialContent)
- }
- acceptGzip := false
- encoding := r.Header["Accept-Encoding"]
- for i := range encoding {
- if encoding[i] == "gzip" {
- acceptGzip = true
- break
- }
- }
- if acceptGzip {
- w.Header().Set("content-encoding", "gzip")
- w2 := gzip.NewWriter(w)
- io.Copy(w2, stream)
- w2.Close()
- } else {
- io.Copy(w, stream)
- }
- stream.Close()
-}
diff --git a/apiServer/object/get_stream.go b/apiServer/object/get_stream.go
deleted file mode 100644
index 4747d2b..0000000
--- a/apiServer/object/get_stream.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package object
-
-import (
- "ObjectStorage/apiServer/heartbeat"
- "ObjectStorage/apiServer/locate"
- _ "ObjectStorage/src/lib/objectstream"
- "ObjectStorage/src/lib/rs"
- "fmt"
- _ "io"
-)
-
-func GetStream(hash string, size int64) (*rs.RSGetStream, error) {
- locateInfo := locate.Locate(hash)
- //data loss completely
- if len(locateInfo) < rs.DATA_SHARDS {
- return nil, fmt.Errorf("object %s locate fail,result is %v", hash, locateInfo)
- }
- //data loss partly,choose some other data servers
- dataServers := make([]string, 0)
- if len(locateInfo) != rs.ALL_SHARDS {
- dataServers = heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS-len(locateInfo), locateInfo)
- }
- return rs.NewRSGetStream(locateInfo, dataServers, hash, size)
-}
diff --git a/apiServer/object/handler.go b/apiServer/object/handler.go
deleted file mode 100644
index aeefcf3..0000000
--- a/apiServer/object/handler.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package object
-
-import "net/http"
-
-func Handler(w http.ResponseWriter, r *http.Request) {
- m := r.Method
- if m == http.MethodPut {
- put(w, r)
- return
- }
- if m == http.MethodGet {
- get(w, r)
- return
- }
- if m == http.MethodDelete {
- del(w, r)
- return
- }
- if m == http.MethodPost {
- post(w, r)
- return
- }
- w.WriteHeader(http.StatusMethodNotAllowed)
-}
diff --git a/apiServer/object/post.go b/apiServer/object/post.go
deleted file mode 100644
index cb8cffe..0000000
--- a/apiServer/object/post.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package object
-
-import (
- "ObjectStorage/apiServer/heartbeat"
- "ObjectStorage/apiServer/locate"
- "ObjectStorage/src/lib/es"
- "ObjectStorage/src/lib/rs"
- "ObjectStorage/src/lib/utils"
- "log"
- "net/http"
- "net/url"
- "strconv"
- "strings"
-)
-
-func post(w http.ResponseWriter, r *http.Request) {
- name := strings.Split(r.URL.EscapedPath(), "/")[2]
- size, e := strconv.ParseInt(r.Header.Get("size"), 0, 64)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusForbidden)
- return
- }
- hash := utils.GetHashFromHeader(r.Header)
- if hash == "" {
- log.Println("missing object hash in digest header")
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- if locate.Exist(url.PathEscape(hash)) {
- e = es.AddVersion(name, hash, size)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- } else {
- w.WriteHeader(http.StatusOK)
- }
- return
- }
- ds := heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS, nil)
- if len(ds) != rs.ALL_SHARDS {
- log.Println("cannot find enough dataServer")
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
- stream, e := rs.NewRSResumablePutStream(ds, name, url.PathEscape(hash), size)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- w.Header().Set("location", "/temp/"+url.PathEscape(stream.ToToken()))
- w.WriteHeader(http.StatusCreated)
-}
diff --git a/apiServer/object/put.go b/apiServer/object/put.go
deleted file mode 100644
index 3e103ce..0000000
--- a/apiServer/object/put.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package object
-
-import (
- "ObjectStorage/src/lib/es"
- "ObjectStorage/src/lib/utils"
- "log"
- "net/http"
- "strings"
-)
-
-func put(w http.ResponseWriter, r *http.Request) {
- hash := utils.GetHashFromHeader(r.Header)
- if hash == "" {
- log.Println("missing object hash in digest header")
- w.WriteHeader(http.StatusBadRequest)
- return
- }
-
- size := utils.GetSizeFromHeader(r.Header)
- c, e := storeObject(r.Body, hash, size)
- if e != nil {
- log.Println(e)
- w.WriteHeader(c)
- return
- }
- if c != http.StatusOK {
- w.WriteHeader(c)
- return
- }
-
- name := strings.Split(r.URL.EscapedPath(), "/")[2]
- e = es.AddVersion(name, hash, size)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- }
-}
diff --git a/apiServer/object/put_stream.go b/apiServer/object/put_stream.go
deleted file mode 100644
index 639f1e8..0000000
--- a/apiServer/object/put_stream.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package object
-
-import (
- "ObjectStorage/apiServer/heartbeat"
- "ObjectStorage/src/lib/rs"
- "fmt"
-)
-
-func putStream(hash string, size int64) (*rs.RSPutStream, error) {
- servers := heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS, nil)
- if len(servers) != rs.ALL_SHARDS {
- return nil, fmt.Errorf("cannot find enough dataServer")
- }
-
- return rs.NewRSPutStream(servers, hash, size)
-}
diff --git a/apiServer/object/store.go b/apiServer/object/store.go
deleted file mode 100644
index 0a460f2..0000000
--- a/apiServer/object/store.go
+++ /dev/null
@@ -1,31 +0,0 @@
-package object
-
-import (
- "ObjectStorage/apiServer/locate"
- "ObjectStorage/src/lib/utils"
- "fmt"
- "io"
- "net/http"
- "net/url"
-)
-
-func storeObject(r io.Reader, hash string, size int64) (int, error) {
- //locate time consume 1s,because our rabbitmq timeout is 1s.
- if locate.Exist(url.PathEscape(hash)) {
- return http.StatusOK, nil
- }
-
- stream, e := putStream(url.PathEscape(hash), size)
- if e != nil {
- return http.StatusInternalServerError, e
- }
-
- reader := io.TeeReader(r, stream)
- d := utils.CalculateHash(reader)
- if d != hash {
- stream.Commit(false)
- return http.StatusBadRequest, fmt.Errorf("object hash mismatch, calculated=%s, requested=%s", d, hash)
- }
- stream.Commit(true)
- return http.StatusOK, nil
-}
diff --git a/apiServer/temp/handler.go b/apiServer/temp/handler.go
deleted file mode 100644
index e506263..0000000
--- a/apiServer/temp/handler.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package temp
-
-import "net/http"
-
-func Handler(w http.ResponseWriter, r *http.Request) {
- method := r.Method
- if method == http.MethodPut {
- put(w, r)
- return
- }
- if method == http.MethodHead {
- head(w, r)
- return
- }
- w.WriteHeader(http.StatusMethodNotAllowed)
-}
diff --git a/apiServer/temp/head.go b/apiServer/temp/head.go
deleted file mode 100644
index 0d67297..0000000
--- a/apiServer/temp/head.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package temp
-
-import (
- "ObjectStorage/src/lib/rs"
- "fmt"
- "log"
- "net/http"
- "strings"
-)
-
-func head(w http.ResponseWriter, r *http.Request) {
- token := strings.Split(r.URL.EscapedPath(), "/")[2]
- stream, e := rs.NewRSResumablePutStreamFromToken(token)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusForbidden)
- return
- }
- current := stream.CurrentSize()
- if current == -1 {
- w.WriteHeader(http.StatusNotFound)
- return
- }
- w.Header().Set("content-length", fmt.Sprintf("%d", current))
-}
diff --git a/apiServer/temp/put.go b/apiServer/temp/put.go
deleted file mode 100644
index 86e3a44..0000000
--- a/apiServer/temp/put.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package temp
-
-import (
- "ObjectStorage/apiServer/locate"
- "ObjectStorage/src/lib/es"
- "ObjectStorage/src/lib/rs"
- "ObjectStorage/src/lib/utils"
- "io"
- "log"
- "net/http"
- "net/url"
- "strings"
-)
-
-func put(w http.ResponseWriter, r *http.Request) {
- token := strings.Split(r.URL.EscapedPath(), "/")[2]
- stream, e := rs.NewRSResumablePutStreamFromToken(token)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusForbidden)
- return
- }
- current := stream.CurrentSize()
- if current == -1 {
- w.WriteHeader(http.StatusNotFound)
- return
- }
- offset := utils.GetOffsetFromHeader(r.Header)
- if current != offset {
- w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
- return
- }
- bytes := make([]byte, rs.BLOCK_SIZE)
- for {
- n, e := io.ReadFull(r.Body, bytes)
- if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- current += int64(n)
- if current > stream.Size {
- stream.Commit(false)
- log.Println("resumable put exceed size")
- w.WriteHeader(http.StatusForbidden)
- return
- }
- if n != rs.BLOCK_SIZE && current != stream.Size {
- return
- }
- stream.Write(bytes[:n])
- if current == stream.Size {
- stream.Flush()
- getStream, e := rs.NewRSResumableGetStream(stream.Servers, stream.Uuids, stream.Size)
- hash := url.PathEscape(utils.CalculateHash(getStream))
- if hash != stream.Hash {
- stream.Commit(false)
- log.Println("resumable put done but hash mismatch")
- w.WriteHeader(http.StatusForbidden)
- return
- }
- if locate.Exist(url.PathEscape(hash)) {
- stream.Commit(false)
- } else {
- stream.Commit(true)
- }
- e = es.AddVersion(stream.Name, stream.Hash, stream.Size)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- }
- return
- }
- }
-}
diff --git a/apiServer/versions/handler.go b/apiServer/versions/handler.go
deleted file mode 100644
index 14ce733..0000000
--- a/apiServer/versions/handler.go
+++ /dev/null
@@ -1,38 +0,0 @@
-package versions
-
-import (
- "ObjectStorage/src/lib/es"
- "encoding/json"
- "log"
- "net/http"
- "strings"
-)
-
-func Handler(w http.ResponseWriter, r *http.Request) {
- m := r.Method
- if m != http.MethodGet {
- w.WriteHeader(http.StatusMethodNotAllowed)
- return
- }
- from := 0
- size := 1000
- name := strings.Split(r.URL.EscapedPath(), "/")[2]
- for {
- metas, err := es.SearchAllVersions(name, from, size)
- if err != nil {
- log.Println(err)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- for i := range metas {
- b, _ := json.Marshal(metas[i])
- w.Write(b)
- w.Write([]byte("\n"))
-
- }
- if len(metas) != size {
- return
- }
- from += size
- }
-}
diff --git a/createfile.sh b/createfile.sh
deleted file mode 100644
index bd0127a..0000000
--- a/createfile.sh
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/bin/bash
-
-for i in `seq 1 6`
-do
- mkdir -p /tmp/$i/objects
- mkdir -p /tmp/$i/temp
-done
diff --git a/dataServer/ObjectsRoot/objects/test b/dataServer/ObjectsRoot/objects/test
deleted file mode 100644
index c577e48..0000000
--- a/dataServer/ObjectsRoot/objects/test
+++ /dev/null
@@ -1 +0,0 @@
-this is a test object
\ No newline at end of file
diff --git a/dataServer/dataServer.go b/dataServer/dataServer.go
deleted file mode 100644
index f6b720f..0000000
--- a/dataServer/dataServer.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package main
-
-import (
- "ObjectStorage/dataServer/heartbeat"
- "ObjectStorage/dataServer/locate"
- "ObjectStorage/dataServer/object"
- "ObjectStorage/dataServer/temp"
- "log"
- "net/http"
- "os"
-)
-
-func main() {
- locate.CollectObjects()
- //发送心跳
- go heartbeat.StartHeartBeat()
- //接收定位消息
- go locate.StartLocate()
- //若定位成功,则开始真正的接收和发送文件
- http.HandleFunc("/objects/", object.Handler)
- http.HandleFunc("/temp/", temp.Handler)
- err := http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil)
- if err != nil {
- log.Fatal("listen error", err)
- }
-}
diff --git a/dataServer/heartbeat/heartbeat.go b/dataServer/heartbeat/heartbeat.go
deleted file mode 100644
index e01a7bb..0000000
--- a/dataServer/heartbeat/heartbeat.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package heartbeat
-
-import (
- "ObjectStorage/src/lib/rabbitmq"
- "os"
- "time"
-)
-
-func StartHeartBeat() {
- q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
- defer q.Close()
- for {
- q.Publish("apiServers", os.Getenv("LISTEN_ADDRESS"))
- time.Sleep(5 * time.Second)
- }
-}
diff --git a/dataServer/locate/locate.go b/dataServer/locate/locate.go
deleted file mode 100644
index 93b3dbb..0000000
--- a/dataServer/locate/locate.go
+++ /dev/null
@@ -1,68 +0,0 @@
-package locate
-
-import (
- "ObjectStorage/src/lib/rabbitmq"
- "ObjectStorage/src/lib/types"
- "os"
- "path/filepath"
- "strconv"
- "strings"
- "sync"
-)
-
-var objects = make(map[string]int)
-var mutex sync.Mutex
-
-func Locate(hash string) int {
- mutex.Lock()
- id, ok := objects[hash]
- mutex.Unlock()
- if !ok {
- return -1
- }
- return id
-}
-func Add(hash string, id int) {
- mutex.Lock()
- objects[hash] = id
- mutex.Unlock()
-}
-
-func Del(hash string) {
- mutex.Lock()
- delete(objects, hash)
- mutex.Unlock()
-}
-
-func StartLocate() {
- q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
- defer q.Close()
- q.Bind("dataServers")
- c := q.Consume()
- for msg := range c {
- hash, err := strconv.Unquote(string(msg.Body))
- if err != nil {
- panic(err)
- }
- id := Locate(hash)
- if id != -1 {
- q.Send(msg.ReplyTo, types.LocateMessage{Addr: os.Getenv("LISTEN_ADDRESS"), Id: id})
- }
- }
-}
-
-func CollectObjects() {
- files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*")
- for i := range files {
- file := strings.Split(filepath.Base(files[i]), ".")
- if len(file) != 3 {
- panic(files[i])
- }
- hash := file[0]
- id, err := strconv.Atoi(file[1])
- if err != nil {
- panic(err)
- }
- objects[hash] = id
- }
-}
diff --git a/dataServer/object/object.go b/dataServer/object/object.go
deleted file mode 100644
index 06687e3..0000000
--- a/dataServer/object/object.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package object
-
-import (
- "ObjectStorage/dataServer/locate"
- "compress/gzip"
- "crypto/sha256"
- "encoding/base64"
- "io"
- "log"
- "net/http"
- "net/url"
- "os"
- "path/filepath"
- "strings"
-)
-
-func Handler(w http.ResponseWriter, r *http.Request) {
- m := r.Method
- if m == http.MethodGet {
- get(w, r)
- return
- }
- if m == http.MethodDelete {
- del(w, r)
- return
- }
- w.WriteHeader(http.StatusMethodNotAllowed)
-}
-
-func get(w http.ResponseWriter, r *http.Request) {
- file := getFile(strings.Split(r.URL.EscapedPath(), "/")[2])
- if file == "" {
- w.WriteHeader(http.StatusNotFound)
- return
- }
- sendFile(w, file)
-}
-
-func getFile(name string) string {
- fileName := os.Getenv("STORAGE_ROOT") + "/objects/" + name + ".*"
- files, _ := filepath.Glob(fileName)
- if len(files) != 1 {
- locate.Del(strings.Split(name, ".")[0])
- return ""
- }
- file := files[0]
- h := sha256.New()
- sendFile(h, file)
- shardHash := url.PathEscape(base64.StdEncoding.EncodeToString(h.Sum(nil)))
- hash := strings.Split(file, ".")[2]
- if shardHash != hash {
- log.Println("object hash mismatched, remove", file)
- locate.Del(hash)
- os.Remove(file)
- return ""
- }
- return file
-}
-
-func sendFile(w io.Writer, file string) {
- f, e := os.Open(file)
- if e != nil {
- log.Println(e)
- return
- }
- defer f.Close()
- gzipStream, e := gzip.NewReader(f)
- if e != nil {
- log.Println(e)
- return
- }
- io.Copy(w, gzipStream)
- gzipStream.Close()
-}
-
-func del(w http.ResponseWriter, r *http.Request) {
- hash := strings.Split(r.URL.EscapedPath(), "/")[2]
- files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/" + hash + ".*")
- if len(files) != 1 {
- return
- }
- locate.Del(hash)
- os.Rename(files[0], os.Getenv("STORAGE_ROOT")+"/garbage/"+filepath.Base(files[0]))
-}
diff --git a/dataServer/temp/commit.go b/dataServer/temp/commit.go
deleted file mode 100644
index e4e6c3e..0000000
--- a/dataServer/temp/commit.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package temp
-
-import (
- "ObjectStorage/dataServer/locate"
- "ObjectStorage/src/lib/utils"
- "compress/gzip"
- "io"
- "net/url"
- "os"
- "strconv"
- "strings"
-)
-
-func (t *tempInfo) hash() string {
- return strings.Split(t.Name, ".")[0]
-}
-
-func (t *tempInfo) id() int {
- s := strings.Split(t.Name, ".")
- id, _ := strconv.Atoi(s[1])
- return id
-}
-
-func commitTempObject(datFile string, tempinfo *tempInfo) {
- f, _ := os.Open(datFile)
- shardHash := url.PathEscape(utils.CalculateHash(f))
- defer f.Close()
- f.Seek(0, io.SeekStart)
- w, _ := os.Create(os.Getenv("STORAGE_ROOT") + "/objects/" + tempinfo.Name + "." + shardHash)
- w2 := gzip.NewWriter(w)
- io.Copy(w2, f)
- w2.Close()
- locate.Add(tempinfo.hash(), tempinfo.id())
- os.Remove(datFile)
- os.Remove(strings.Split(datFile, ".")[0])
-}
diff --git a/dataServer/temp/delete.go b/dataServer/temp/delete.go
deleted file mode 100644
index 59150b9..0000000
--- a/dataServer/temp/delete.go
+++ /dev/null
@@ -1,15 +0,0 @@
-package temp
-
-import (
- "net/http"
- "os"
- "strings"
-)
-
-func del(w http.ResponseWriter, r *http.Request) {
- uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
- infoFile := os.Getenv("STORAGE_ROOT") + "/temp/" + uuid
- datFile := infoFile + ".dat"
- os.Remove(infoFile)
- os.Remove(datFile)
-}
diff --git a/dataServer/temp/get.go b/dataServer/temp/get.go
deleted file mode 100644
index 8e9b1af..0000000
--- a/dataServer/temp/get.go
+++ /dev/null
@@ -1,21 +0,0 @@
-package temp
-
-import (
- "io"
- "log"
- "net/http"
- "os"
- "strings"
-)
-
-func get(w http.ResponseWriter, r *http.Request) {
- uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
- f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/temp/" + uuid + ".dat")
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusNotFound)
- return
- }
- defer f.Close()
- io.Copy(w, f)
-}
diff --git a/dataServer/temp/handler.go b/dataServer/temp/handler.go
deleted file mode 100644
index 29c7a0b..0000000
--- a/dataServer/temp/handler.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package temp
-
-import "net/http"
-
-func Handler(w http.ResponseWriter, r *http.Request) {
- m := r.Method
- if m == http.MethodPut {
- put(w, r)
- return
- }
- if m == http.MethodPatch {
- patch(w, r)
- return
- }
- if m == http.MethodPost {
- post(w, r)
- return
- }
- if m == http.MethodDelete {
- del(w, r)
- return
- }
- if m == http.MethodHead {
- head(w, r)
- return
- }
- if m == http.MethodGet {
- get(w, r)
- return
- }
- w.WriteHeader(http.StatusMethodNotAllowed)
-}
diff --git a/dataServer/temp/head.go b/dataServer/temp/head.go
deleted file mode 100644
index 755f8c4..0000000
--- a/dataServer/temp/head.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package temp
-
-import (
- "fmt"
- "log"
- "net/http"
- "os"
- "strings"
-)
-
-func head(w http.ResponseWriter, r *http.Request) {
- uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
- f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/temp/" + uuid + ".dat")
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusNotFound)
- return
- }
- defer f.Close()
- info, e := f.Stat()
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- w.Header().Set("content-length", fmt.Sprintf("%d", info.Size()))
-}
diff --git a/dataServer/temp/patch.go b/dataServer/temp/patch.go
deleted file mode 100644
index 8a577b5..0000000
--- a/dataServer/temp/patch.go
+++ /dev/null
@@ -1,61 +0,0 @@
-package temp
-
-import (
- "encoding/json"
- "io"
- "io/ioutil"
- "log"
- "net/http"
- "os"
- "strings"
-)
-
-func patch(w http.ResponseWriter, r *http.Request) {
- uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
- tempinfo, e := readFromFile(uuid)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusNotFound)
- return
- }
- infoFile := os.Getenv("STORAGE_ROOT") + "/temp/" + uuid
- datFile := infoFile + ".dat"
- f, e := os.OpenFile(datFile, os.O_WRONLY|os.O_APPEND, 0)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- defer f.Close()
- _, e = io.Copy(f, r.Body)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- info, e := f.Stat()
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- actual := info.Size()
- if actual > tempinfo.Size {
- os.Remove(datFile)
- os.Remove(infoFile)
- log.Println("actual size", actual, "exceeds", tempinfo.Size)
- w.WriteHeader(http.StatusInternalServerError)
- }
-}
-
-func readFromFile(uuid string) (*tempInfo, error) {
- f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/temp/" + uuid)
- if e != nil {
- return nil, e
- }
- defer f.Close()
- b, _ := ioutil.ReadAll(f)
- var info tempInfo
- json.Unmarshal(b, &info)
- return &info, nil
-}
diff --git a/dataServer/temp/post.go b/dataServer/temp/post.go
deleted file mode 100644
index 7173c9d..0000000
--- a/dataServer/temp/post.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package temp
-
-import (
- "encoding/json"
- "log"
- "net/http"
- "os"
- "os/exec"
- "strconv"
- "strings"
-)
-
-type tempInfo struct {
- Uuid string
- Name string
- Size int64
-}
-
-func post(w http.ResponseWriter, r *http.Request) {
- output, _ := exec.Command("uuidgen").Output()
- uuid := strings.TrimSuffix(string(output), "\n")
- name := strings.Split(r.URL.EscapedPath(), "/")[2]
- size, e := strconv.ParseInt(r.Header.Get("size"), 0, 64)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- t := tempInfo{Uuid: uuid, Name: name, Size: size}
- e = t.writeToFile()
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- os.Create(os.Getenv("STORAGE_ROOT") + "/temp/" + t.Uuid + ".dat")
- w.Write([]byte(uuid))
-}
-
-func (t *tempInfo) writeToFile() error {
- f, e := os.Create(os.Getenv("STORAGE_ROOT") + "/temp/" + t.Uuid)
- if e != nil {
- return e
- }
- defer f.Close()
- b, _ := json.Marshal(t)
- f.Write(b)
- return nil
-}
diff --git a/dataServer/temp/put.go b/dataServer/temp/put.go
deleted file mode 100644
index a2ef0d3..0000000
--- a/dataServer/temp/put.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package temp
-
-import (
- "log"
- "net/http"
- "os"
- "strings"
-)
-
-func put(w http.ResponseWriter, r *http.Request) {
- uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
- tempinfo, e := readFromFile(uuid)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusNotFound)
- return
- }
- infoFile := os.Getenv("STORAGE_ROOT") + "/temp/" + uuid
- datFile := infoFile + ".dat"
- f, e := os.Open(datFile)
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- defer f.Close()
- info, e := f.Stat()
- if e != nil {
- log.Println(e)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- actual := info.Size()
- os.Remove(infoFile)
- if actual != tempinfo.Size {
- os.Remove(datFile)
- log.Println("actual size mismatch, expect", tempinfo.Size, "actual", actual)
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- commitTempObject(datFile, tempinfo)
-}
diff --git a/deletefile.sh b/deletefile.sh
deleted file mode 100644
index 4a651d4..0000000
--- a/deletefile.sh
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/bin/bash
-
-for i in `seq 1 6`
-do
- rm -rf /tmp/$i/objects
- rm -rf /tmp/$i/temp
-done
\ No newline at end of file
diff --git a/go.mod b/go.mod
deleted file mode 100644
index 20397be..0000000
--- a/go.mod
+++ /dev/null
@@ -1,9 +0,0 @@
-module ObjectStorage
-
-go 1.14
-
-require (
- github.com/klauspost/reedsolomon v1.9.9
- github.com/mmcloughlin/avo v0.0.0-20201130012700-45c8ae10fd12 // indirect
- github.com/streadway/amqp v1.0.0
-)
diff --git a/go.sum b/go.sum
deleted file mode 100644
index b5c4a63..0000000
--- a/go.sum
+++ /dev/null
@@ -1,34 +0,0 @@
-github.com/klauspost/cpuid v1.2.4 h1:EBfaK0SWSwk+fgk6efYFWdzl8MwRWoOO1gkmiaTXPW4=
-github.com/klauspost/cpuid v1.2.4/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
-github.com/klauspost/reedsolomon v1.9.9 h1:qCL7LZlv17xMixl55nq2/Oa1Y86nfO8EqDfv2GHND54=
-github.com/klauspost/reedsolomon v1.9.9/go.mod h1:O7yFFHiQwDR6b2t63KPUpccPtNdp5ADgh1gg4fd12wo=
-github.com/mmcloughlin/avo v0.0.0-20201130012700-45c8ae10fd12 h1:JJvkIBIdkzz71+2UD6CHfjDC2O3fCZJ98KUaB70gr00=
-github.com/mmcloughlin/avo v0.0.0-20201130012700-45c8ae10fd12/go.mod h1:6aKT4zZIrpGqB3RpFU14ByCSSyKY6LfJz4J/JJChHfI=
-github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
-github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
-github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
-golang.org/x/arch v0.0.0-20201008161808-52c3e6f60cff/go.mod h1:flIaEI6LNU6xOCD5PaJvn9wGP0agmIOqjrtsKGRguv4=
-golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
-golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
-golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
-golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
-golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
-golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
-golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
-golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174 h1:0rx0F4EjJNbxTuzWe0KjKcIzs+3VEb/Mrs/d1ciNz1c=
-golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
-golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
-golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
diff --git a/hash b/hash
deleted file mode 100644
index 0632cf5..0000000
--- a/hash
+++ /dev/null
@@ -1 +0,0 @@
-b3BlbnNzbCBkZ3N0IC1zaGEyNTYgLWJpbmFyeSAvdG1wL2ZpbGUK
diff --git a/initenv.sh b/initenv.sh
deleted file mode 100644
index 1d38faa..0000000
--- a/initenv.sh
+++ /dev/null
@@ -1,17 +0,0 @@
-#!/bin/bash
-
-for i in `seq 1 6`
-do
- mkdir -p /tmp/$i/objects
- mkdir -p /tmp/$i/temp
- mkdir -p /tmp/$i/garbage
-done
-
-A
-A
-A
-A
-A
-A
-A
-done
diff --git a/repairTools/deleteOldMetadata/deleteOldMetadata.go b/repairTools/deleteOldMetadata/deleteOldMetadata.go
deleted file mode 100644
index 87083c6..0000000
--- a/repairTools/deleteOldMetadata/deleteOldMetadata.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package main
-
-import (
- "ObjectStorage/src/lib/es"
- "log"
-)
-
-const MIN_VERSION_COUNT = 5
-
-func main() {
- buckets, e := es.SearchVersionStatus(MIN_VERSION_COUNT + 1)
- if e != nil {
- log.Println(e)
- return
- }
- log.Println(len(buckets))
- for i := range buckets {
- bucket := buckets[i]
- for v := 0; v < bucket.Doc_count-MIN_VERSION_COUNT; v++ {
- es.DelMetadata(bucket.Key, v+int(bucket.Min_version.Value))
- }
- }
-}
diff --git a/repairTools/deleteOrphanObject/deleteOrphanObject.go b/repairTools/deleteOrphanObject/deleteOrphanObject.go
deleted file mode 100644
index a63e094..0000000
--- a/repairTools/deleteOrphanObject/deleteOrphanObject.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package main
-
-import (
- "ObjectStorage/src/lib/es"
- "log"
- "net/http"
- "os"
- "path/filepath"
- "strings"
-)
-
-func main() {
- files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*")
-
- for i := range files {
- hash := strings.Split(filepath.Base(files[i]), ".")[0]
- hashInMetadata, e := es.HasHash(hash)
- if e != nil {
- log.Println(e)
- return
- }
- if !hashInMetadata {
- del(hash)
- }
- }
-}
-
-func del(hash string) {
- log.Println("delete", hash)
- url := "http://" + os.Getenv("LISTEN_ADDRESS") + "/objects/" + hash
- request, _ := http.NewRequest("DELETE", url, nil)
- client := http.Client{}
- client.Do(request)
-}
diff --git a/repairTools/objectScanner/objectScanner.go b/repairTools/objectScanner/objectScanner.go
deleted file mode 100644
index 62d2aa6..0000000
--- a/repairTools/objectScanner/objectScanner.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package main
-
-import (
- "ObjectStorage/apiServer/object"
- "ObjectStorage/src/lib/es"
- "ObjectStorage/src/lib/utils"
- "log"
- "os"
- "path/filepath"
- "strings"
-)
-
-func main() {
- files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*")
-
- for i := range files {
- hash := strings.Split(filepath.Base(files[i]), ".")[0]
- verify(hash)
- }
-}
-
-func verify(hash string) {
- log.Println("verify", hash)
- size, e := es.SearchHashSize(hash)
- if e != nil {
- log.Println(e)
- return
- }
- stream, e := object.GetStream(hash, size)
- if e != nil {
- log.Println(e)
- return
- }
- d := utils.CalculateHash(stream)
- if d != hash {
- log.Printf("object hash mismatch, calculated=%s, requested=%s", d, hash)
- }
- stream.Close()
-}
diff --git a/src/lib/es/es.go b/src/lib/es/es.go
deleted file mode 100644
index 23cad6a..0000000
--- a/src/lib/es/es.go
+++ /dev/null
@@ -1,209 +0,0 @@
-package es
-
-import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "net/http"
- "os"
- "strings"
-)
-
-type Metadata struct {
- Name string `json:"name"`
- Version int `json:"version"`
- Size int64 `json:"size"`
- Hash string `json:"hash"`
-}
-
-type Bucket struct {
- Key string
- Doc_count int
- Min_version struct {
- Value float32
- }
-}
-
-type aggregateResult struct {
- Aggregations struct {
- Group_by_name struct {
- Buckets []Bucket
- }
- }
-}
-
-func getMetadata(name string, versionId int) (meta Metadata, err error) {
- url := fmt.Sprintf("http://%s/metadata/objects/%s_%d/_source", os.Getenv("ES_SERVER"), name, versionId)
- r, err := http.Get(url)
- if err != nil {
- return
- }
- if r.StatusCode != http.StatusOK {
- err = fmt.Errorf("failed to get %s_%d", name, versionId)
- return
- }
- //获取到文件元数据
- result, _ := ioutil.ReadAll(r.Body)
- json.Unmarshal(result, &meta)
- return
-}
-
-type hit struct {
- Source Metadata `json:"_source"`
-}
-type searchResult struct {
- Hits struct {
- Total int
- Hits []hit
- }
-}
-
-func SearchLatestVersion(name string) (meta Metadata, err error) {
- url := fmt.Sprintf("http://%s/metadata/_search?q=name:%s&size=1&sort=version:desc", os.Getenv("ES_SERVER"), name)
- r, err := http.Get(url)
- if err != nil {
- return
- }
- if r.StatusCode != http.StatusOK {
- err = fmt.Errorf("fialed to find %s latest version", name)
- return
- }
- result, _ := ioutil.ReadAll(r.Body)
- var sr searchResult
- json.Unmarshal(result, &sr)
- if len(sr.Hits.Hits) != 0 {
- meta = sr.Hits.Hits[0].Source
-
- }
- return
-}
-func GetMetadata(name string, version int) (meta Metadata, err error) {
- if version == 0 {
- return SearchLatestVersion(name)
- }
- return getMetadata(name, version)
-}
-
-func PutMetadata(name string, version int, size int64, hash string) error {
- doc := fmt.Sprintf(`{"name":"%s","version":%d,"size":%d,"hash":"%s"}`, name, version, size, hash)
- client := http.Client{}
- url := fmt.Sprintf("http://%s/metadata/objects/%s_%d?op_type=create", os.Getenv("ES_SERVER"), name, version)
- request, _ := http.NewRequest("PUT", url, strings.NewReader(doc))
- request.Header.Add("Content-Type", "application/json")
- request.Header.Add("datatype", "json")
- r, err := client.Do(request)
- if err != nil {
- return err
- }
- //如果多个客户端同时上传一个元数据,只有一个成功,其他的会增加版本号继续递归上传,直到成功
- if r.StatusCode == http.StatusConflict {
- return PutMetadata(name, version+1, size, hash)
- }
- if r.StatusCode != http.StatusCreated {
- result, _ := ioutil.ReadAll(r.Body)
- return fmt.Errorf("failed to put metadata : %d %s", r.StatusCode, string(result))
- }
- return nil
-}
-
-func AddVersion(name, hash string, size int64) error {
- version, e := SearchLatestVersion(name)
- if e != nil {
- return e
- }
- return PutMetadata(name, version.Version+1, size, hash)
-}
-
-func SearchAllVersions(name string, from, size int) ([]Metadata, error) {
- //构造请求url
- url := fmt.Sprintf("http://%s/metadata/objects/_search?sort=name,version&from=%d&size=%d", os.Getenv("ES_SERVER"), from, size)
- if name != "" {
- url += "&q=name:" + name
- }
- fmt.Println(url)
- r, err := http.Get(url)
- if err != nil {
- return nil, err
- }
- metas := make([]Metadata, 0)
- result, _ := ioutil.ReadAll(r.Body)
- var sr searchResult
- json.Unmarshal(result, &sr)
- for i := range sr.Hits.Hits {
- metas = append(metas, sr.Hits.Hits[i].Source)
- }
- return metas, nil
-}
-
-func SearchVersionStatus(min_doc_count int) ([]Bucket, error) {
- client := http.Client{}
- url := fmt.Sprintf("http://%s/metadata/_search", os.Getenv("ES_SERVER"))
- body := fmt.Sprintf(`
- {
- "size": 0,
- "aggs": {
- "group_by_name": {
- "terms": {
- "field": "name",
- "min_doc_count": "%d"
- },
- "aggs": {
- "min_version": {
- "min": {
- "field": "version"
- }
- }
- }
- }
- }
- }`, min_doc_count)
- request, _ := http.NewRequest("GET", url, strings.NewReader(body))
- r, e := client.Do(request)
- if e != nil {
- return nil, e
- }
- b, _ := ioutil.ReadAll(r.Body)
- var ar aggregateResult
- json.Unmarshal(b, &ar)
- return ar.Aggregations.Group_by_name.Buckets, nil
-}
-
-func DelMetadata(name string, version int) {
- client := http.Client{}
- url := fmt.Sprintf("http://%s/metadata/objects/%s_%d",
- os.Getenv("ES_SERVER"), name, version)
- request, _ := http.NewRequest("DELETE", url, nil)
- client.Do(request)
-}
-
-func HasHash(hash string) (bool, error) {
- url := fmt.Sprintf("http://%s/metadata/_search?q=hash:%s&size=0", os.Getenv("ES_SERVER"), hash)
- r, e := http.Get(url)
- if e != nil {
- return false, e
- }
- b, _ := ioutil.ReadAll(r.Body)
- var sr searchResult
- json.Unmarshal(b, &sr)
- return sr.Hits.Total != 0, nil
-}
-
-func SearchHashSize(hash string) (size int64, e error) {
- url := fmt.Sprintf("http://%s/metadata/_search?q=hash:%s&size=1",
- os.Getenv("ES_SERVER"), hash)
- r, e := http.Get(url)
- if e != nil {
- return
- }
- if r.StatusCode != http.StatusOK {
- e = fmt.Errorf("fail to search hash size: %d", r.StatusCode)
- return
- }
- result, _ := ioutil.ReadAll(r.Body)
- var sr searchResult
- json.Unmarshal(result, &sr)
- if len(sr.Hits.Hits) != 0 {
- size = sr.Hits.Hits[0].Source.Size
- }
- return
-}
diff --git a/src/lib/objectstream/get.go b/src/lib/objectstream/get.go
deleted file mode 100644
index 14ccc37..0000000
--- a/src/lib/objectstream/get.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package objectstream
-
-import (
- "fmt"
- "io"
- "net/http"
-)
-
-type GetStream struct {
- reader io.Reader
-}
-
-func (r *GetStream) Read(p []byte) (n int, err error) {
- return r.reader.Read(p)
-}
-func newGetStream(url string) (*GetStream, error) {
- r, err := http.Get(url)
- if err != nil {
- return nil, err
- }
- if r.StatusCode != http.StatusOK {
- return nil, fmt.Errorf("dataServer return http code %d", r.StatusCode)
- }
- return &GetStream{r.Body}, nil
-}
-
-func NewGetStream(server string, object string) (*GetStream, error) {
- if server == "" || object == "" {
- return nil, fmt.Errorf("invalid server %s object %s", server, object)
- }
- return newGetStream("http://" + server + "/objects/" + object)
-}
diff --git a/src/lib/objectstream/put.go b/src/lib/objectstream/put.go
deleted file mode 100644
index ef8ab50..0000000
--- a/src/lib/objectstream/put.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package objectstream
-
-import (
- "fmt"
- "io"
- "net/http"
-)
-
-type PutStream struct {
- writer *io.PipeWriter
- c chan error
-}
-
-func (w *PutStream) Write(p []byte) (n int, err error) {
- return w.writer.Write(p)
-}
-func (w *PutStream) Close() error {
- w.writer.Close()
- //close执行后http cleint请求会发送出去
- //将HTTPclient doRequest结果返回
- return <-w.c
-}
-
-func NewPutStream(server, object string) *PutStream {
- reader, writer := io.Pipe()
- c := make(chan error)
- go func() {
- request, _ := http.NewRequest("PUT", "http://"+server+"/objects/"+object, reader)
- client := http.Client{}
- r, e := client.Do(request)
- if e == nil && r.StatusCode != http.StatusOK {
- e = fmt.Errorf("dataServer return http code %d", r.StatusCode)
- }
- c <- e
- }()
- return &PutStream{writer, c}
-}
diff --git a/src/lib/objectstream/temp.go b/src/lib/objectstream/temp.go
deleted file mode 100644
index 8bf4a2e..0000000
--- a/src/lib/objectstream/temp.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package objectstream
-
-import (
- "fmt"
- "io/ioutil"
- "log"
- "net/http"
- "strings"
-)
-
-type TempPutStream struct {
- Server string
- Uuid string
-}
-
-func NewTempPutStream(server, object string, size int64) (*TempPutStream, error) {
- request, e := http.NewRequest("POST", "http://"+server+"/temp/"+object, nil)
- if e != nil {
- return nil, e
- }
- request.Header.Set("size", fmt.Sprintf("%d", size))
- client := http.Client{}
- response, e := client.Do(request)
- if e != nil {
- return nil, e
- }
- uuid, e := ioutil.ReadAll(response.Body)
- if e != nil {
- return nil, e
- }
- return &TempPutStream{server, string(uuid)}, nil
-}
-
-func (w *TempPutStream) Write(p []byte) (n int, err error) {
- request, e := http.NewRequest("PATCH", "http://"+w.Server+"/temp/"+w.Uuid, strings.NewReader(string(p)))
- if e != nil {
- return 0, e
- }
- client := http.Client{}
- r, e := client.Do(request)
- if e != nil {
- return 0, e
- }
- if r.StatusCode != http.StatusOK {
- return 0, fmt.Errorf("dataServer return http code %d", r.StatusCode)
- }
- return len(p), nil
-}
-
-func (w *TempPutStream) Commit(good bool) {
- log.Println("commit status:", good)
- method := http.MethodDelete
- if good {
- method = http.MethodPut
- }
- request, _ := http.NewRequest(method, "http://"+w.Server+"/temp/"+w.Uuid, nil)
- client := http.Client{}
- client.Do(request)
-}
-
-func NewTempGetStream(server, uuid string) (*GetStream, error) {
- return newGetStream("http://" + server + "/temp/" + uuid)
-}
diff --git a/src/lib/rabbitmq/rabbitmq.go b/src/lib/rabbitmq/rabbitmq.go
deleted file mode 100644
index 238fd4e..0000000
--- a/src/lib/rabbitmq/rabbitmq.go
+++ /dev/null
@@ -1,112 +0,0 @@
-package rabbitmq
-
-import (
- "encoding/json"
- "github.com/streadway/amqp"
-)
-
-type RabbitMQ struct {
- channel *amqp.Channel
- conn *amqp.Connection
- Name string
- exchange string
-}
-
-func New(s string) *RabbitMQ {
- conn, e := amqp.Dial(s)
- if e != nil {
- panic(e)
- }
-
- ch, e := conn.Channel()
- if e != nil {
- panic(e)
- }
-
- q, e := ch.QueueDeclare(
- "", // name
- false, // durable
- true, // delete when unused
- false, // exclusive
- false, // no-wait
- nil, // arguments
- )
- if e != nil {
- panic(e)
- }
-
- mq := new(RabbitMQ)
- mq.channel = ch
- mq.conn = conn
- mq.Name = q.Name
- return mq
-}
-
-func (q *RabbitMQ) Bind(exchange string) {
- e := q.channel.QueueBind(
- q.Name, // queue name
- "", // routing key
- exchange, // exchange
- false,
- nil)
- if e != nil {
- panic(e)
- }
- q.exchange = exchange
-}
-
-func (q *RabbitMQ) Send(queue string, body interface{}) {
- str, e := json.Marshal(body)
- if e != nil {
- panic(e)
- }
- e = q.channel.Publish("",
- queue,
- false,
- false,
- amqp.Publishing{
- ReplyTo: q.Name,
- Body: []byte(str),
- })
- if e != nil {
- panic(e)
- }
-}
-
-func (q *RabbitMQ) Publish(exchange string, body interface{}) {
- str, e := json.Marshal(body)
- if e != nil {
- panic(e)
- }
- e = q.channel.Publish(exchange,
- "",
- false,
- false,
- amqp.Publishing{
- ReplyTo: q.Name,
- Body: []byte(str),
- })
- if e != nil {
- panic(e)
- }
-}
-
-func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
- c, e := q.channel.Consume(q.Name,
- "",
- true,
- false,
- false,
- false,
- nil,
- )
- if e != nil {
- panic(e)
- }
- return c
-}
-
-func (q *RabbitMQ) Close() {
- q.channel.Close()
- q.conn.Close()
-}
diff --git a/src/lib/rabbitmq/rabbitmq_test.go b/src/lib/rabbitmq/rabbitmq_test.go
deleted file mode 100644
index e438f45..0000000
--- a/src/lib/rabbitmq/rabbitmq_test.go
+++ /dev/null
@@ -1,92 +0,0 @@
-package rabbitmq
-
-import (
- "encoding/json"
- "testing"
-)
-
-const host = "amqp://test:test@10.29.102.173:5672"
-
-func TestPublish(t *testing.T) {
- q := New(host)
- defer q.Close()
- q.Bind("test")
-
- q2 := New(host)
- defer q2.Close()
- q2.Bind("test")
-
- q3 := New(host)
- defer q3.Close()
-
- expect := "test"
- q3.Publish("test2", "any")
- q3.Publish("test", expect)
-
- c := q.Consume()
- msg := <-c
- var actual interface{}
- err := json.Unmarshal(msg.Body, &actual)
- if err != nil {
- t.Error(err)
- }
- if actual != expect {
- t.Errorf("expected %s, actual %s", expect, actual)
- }
- if msg.ReplyTo != q3.Name {
- t.Error(msg)
- }
-
- c2 := q2.Consume()
- msg = <-c2
- err = json.Unmarshal(msg.Body, &actual)
- if err != nil {
- t.Error(err)
- }
- if actual != expect {
- t.Errorf("expected %s, actual %s", expect, actual)
- }
- if msg.ReplyTo != q3.Name {
- t.Error(msg)
- }
- q2.Send(msg.ReplyTo, "test3")
- c3 := q3.Consume()
- msg = <-c3
- if string(msg.Body) != `"test3"` {
- t.Error(string(msg.Body))
- }
-}
-
-func TestSend(t *testing.T) {
- q := New(host)
- defer q.Close()
-
- q2 := New(host)
- defer q2.Close()
-
- expect := "test"
- expect2 := "test2"
- q2.Send(q.Name, expect)
- q2.Send(q2.Name, expect2)
-
- c := q.Consume()
- msg := <-c
- var actual interface{}
- err := json.Unmarshal(msg.Body, &actual)
- if err != nil {
- t.Error(err)
- }
- if actual != expect {
- t.Errorf("expected %s, actual %s", expect, actual)
- }
-
- c2 := q2.Consume()
- msg = <-c2
- err = json.Unmarshal(msg.Body, &actual)
- if err != nil {
- t.Error(err)
- }
- if actual != expect2 {
- t.Errorf("expected %s, actual %s", expect2, actual)
- }
-}
diff --git a/src/lib/rs/common.go b/src/lib/rs/common.go
deleted file mode 100644
index d417c07..0000000
--- a/src/lib/rs/common.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package rs
-
-const (
- DATA_SHARDS = 4
- PARITY_SHARDS = 2
- ALL_SHARDS = DATA_SHARDS + PARITY_SHARDS
- BLOCK_PER_SHARD = 8000
- BLOCK_SIZE = BLOCK_PER_SHARD * DATA_SHARDS
-)
diff --git a/src/lib/rs/decoder.go b/src/lib/rs/decoder.go
deleted file mode 100644
index bb33878..0000000
--- a/src/lib/rs/decoder.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package rs
-
-import (
- "ObjectStorage/src/lib/objectstream"
- "github.com/klauspost/reedsolomon"
- "io"
- "log"
-)
-
-type decoder struct {
- readers []io.Reader
- writers []io.Writer
- enc reedsolomon.Encoder
- size int64
- cache []byte
- cacheSize int
- total int64
-}
-
-func NewDecoder(readers []io.Reader, writers []io.Writer, size int64) *decoder {
- enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS)
- return &decoder{readers, writers, enc, size, nil, 0, 0}
-}
-
-func (d *decoder) Read(p []byte) (n int, err error) {
- if d.cacheSize == 0 {
- e := d.getData()
- if e != nil {
- return 0, e
- }
- }
- length := len(p)
- if d.cacheSize < length {
- length = d.cacheSize
- }
- d.cacheSize -= length
- copy(p, d.cache[:length])
- d.cache = d.cache[length:]
- return length, nil
-}
-
-func (d *decoder) getData() error {
- if d.total == d.size {
- return io.EOF
- }
- shards := make([][]byte, ALL_SHARDS)
- repairIds := make([]int, 0)
- for i := range shards {
- if d.readers[i] == nil {
- repairIds = append(repairIds, i)
- } else {
- shards[i] = make([]byte, BLOCK_PER_SHARD)
- n, e := io.ReadFull(d.readers[i], shards[i])
- if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF {
- shards[i] = nil
- } else if n != BLOCK_PER_SHARD {
- shards[i] = shards[i][:n]
- }
- }
- }
- e := d.enc.Reconstruct(shards)
- if e != nil {
- return e
- }
- for i := range repairIds {
- id := repairIds[i]
- _, err := d.writers[id].Write(shards[id])
- if err != nil {
- d.writers[id].(*objectstream.TempPutStream).Commit(false)
- log.Println(err)
- }
- d.writers[id].(*objectstream.TempPutStream).Commit(true)
- }
- for i := 0; i < DATA_SHARDS; i++ {
- shardSize := int64(len(shards[i]))
- if d.total+shardSize > d.size {
- shardSize -= d.total + shardSize - d.size
- }
- d.cache = append(d.cache, shards[i][:shardSize]...)
- d.cacheSize += int(shardSize)
- d.total += shardSize
- }
- return nil
-}
diff --git a/src/lib/rs/encoder.go b/src/lib/rs/encoder.go
deleted file mode 100644
index 7eb8f4c..0000000
--- a/src/lib/rs/encoder.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package rs
-
-import (
- "github.com/klauspost/reedsolomon"
- "io"
-)
-
-type encoder struct {
- writers []io.Writer
- enc reedsolomon.Encoder
- cache []byte
-}
-
-func NewEncoder(writers []io.Writer) *encoder {
- enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS)
- return &encoder{writers, enc, nil}
-}
-
-func (e *encoder) Write(p []byte) (n int, err error) {
- length := len(p)
- current := 0
- for length != 0 {
- next := BLOCK_SIZE - len(e.cache)
- if next > length {
- next = length
- }
- e.cache = append(e.cache, p[current:current+next]...)
- if len(e.cache) == BLOCK_SIZE {
- e.Flush()
- }
- current += next
- length -= next
- }
- return len(p), nil
-}
-
-func (e *encoder) Flush() {
- if len(e.cache) == 0 {
- return
- }
- shards, _ := e.enc.Split(e.cache)
- e.enc.Encode(shards)
- for i := range shards {
- e.writers[i].Write(shards[i])
- }
- e.cache = []byte{}
-}
diff --git a/src/lib/rs/get.go b/src/lib/rs/get.go
deleted file mode 100644
index a628d1e..0000000
--- a/src/lib/rs/get.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package rs
-
-import (
- "ObjectStorage/src/lib/objectstream"
- "fmt"
- "io"
-)
-
-type RSGetStream struct {
- *decoder
-}
-
-//get every shards
-func NewRSGetStream(locateInfo map[int]string, dataServers []string, hash string, size int64) (*RSGetStream, error) {
- if len(locateInfo)+len(dataServers) != ALL_SHARDS {
- return nil, fmt.Errorf("dataServers number is not match!")
- }
- readers := make([]io.Reader, ALL_SHARDS)
- for i := 0; i < ALL_SHARDS; i++ {
- server := locateInfo[i]
- if server == "" {
- locateInfo[i] = dataServers[0]
- dataServers = dataServers[1:]
- continue
- }
- //if server is not empty,read data from the server
- reader, err := objectstream.NewGetStream(locateInfo[i], fmt.Sprintf("%s.%d", hash, i))
- if err == nil {
- //get data shard i success
- readers[i] = reader
- }
- }
- writers := make([]io.Writer, ALL_SHARDS)
- perShards := (size + DATA_SHARDS - 1) / DATA_SHARDS
- var e error
- for i := range readers {
- if readers[i] == nil {
- writers[i], e = objectstream.NewTempPutStream(locateInfo[i], fmt.Sprintf("%s.%d", hash, i), perShards)
- if e != nil {
- return nil, e
- }
-
- }
- }
- dec := NewDecoder(readers, writers, size)
- return &RSGetStream{dec}, nil
-}
-
-func (s *RSGetStream) Close() {
- for i := range s.writers {
- if s.writers[i] != nil {
- s.writers[i].(*objectstream.TempPutStream).Commit(true)
- }
- }
-}
-
-func (s *RSGetStream) Seek(offset int64, whence int64) (int64, error) {
- if whence != io.SeekCurrent {
- panic("only support seekcurrent!")
- }
- if offset < 0 {
- panic("only support forward seek")
- }
- length := int64(BLOCK_SIZE)
- for offset != 0 {
- if offset > length {
- offset = length
- }
- buf := make([]byte, length)
- io.ReadFull(s, buf)
- offset -= length
- }
- return offset, nil
-}
diff --git a/src/lib/rs/put.go b/src/lib/rs/put.go
deleted file mode 100644
index aa0bfb3..0000000
--- a/src/lib/rs/put.go
+++ /dev/null
@@ -1,38 +0,0 @@
-package rs
-
-import (
- "ObjectStorage/src/lib/objectstream"
- "fmt"
- "io"
-)
-
-type RSPutStream struct {
- *encoder
-}
-
-func NewRSPutStream(dataServers []string, hash string, size int64) (*RSPutStream, error) {
- if len(dataServers) != ALL_SHARDS {
- return nil, fmt.Errorf("dataServers number mismatch")
- }
-
- perShard := (size + DATA_SHARDS - 1) / DATA_SHARDS
- writers := make([]io.Writer, ALL_SHARDS)
- var e error
- for i := range writers {
- writers[i], e = objectstream.NewTempPutStream(dataServers[i],
- fmt.Sprintf("%s.%d", hash, i), perShard)
- if e != nil {
- return nil, e
- }
- }
- enc := NewEncoder(writers)
-
- return &RSPutStream{enc}, nil
-}
-
-func (s *RSPutStream) Commit(success bool) {
- s.Flush()
- for i := range s.writers {
- s.writers[i].(*objectstream.TempPutStream).Commit(success)
- }
-}
diff --git a/src/lib/rs/resumable_get.go b/src/lib/rs/resumable_get.go
deleted file mode 100644
index efa5276..0000000
--- a/src/lib/rs/resumable_get.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package rs
-
-import (
- "ObjectStorage/src/lib/objectstream"
- "io"
-)
-
-type RSResumableGetStream struct {
- *decoder
-}
-
-func NewRSResumableGetStream(dataServers []string, uuids []string, size int64) (*RSResumableGetStream, error) {
- readers := make([]io.Reader, ALL_SHARDS)
- var e error
- for i := 0; i < ALL_SHARDS; i++ {
- readers[i], e = objectstream.NewTempGetStream(dataServers[i], uuids[i])
- if e != nil {
- return nil, e
- }
- }
- writers := make([]io.Writer, ALL_SHARDS)
- dec := NewDecoder(readers, writers, size)
- return &RSResumableGetStream{dec}, nil
-}
diff --git a/src/lib/rs/resumable_put.go b/src/lib/rs/resumable_put.go
deleted file mode 100644
index d33f75b..0000000
--- a/src/lib/rs/resumable_put.go
+++ /dev/null
@@ -1,78 +0,0 @@
-package rs
-
-import (
- "ObjectStorage/src/lib/objectstream"
- "ObjectStorage/src/lib/utils"
- "encoding/base64"
- "encoding/json"
- "fmt"
- "io"
- "log"
- "net/http"
-)
-
-type resumableToken struct {
- Name string
- Size int64
- Hash string
- Servers []string
- Uuids []string
-}
-
-type RSResumablePutStream struct {
- *RSPutStream
- *resumableToken
-}
-
-func NewRSResumablePutStream(dataServers []string, name, hash string, size int64) (*RSResumablePutStream, error) {
- putStream, e := NewRSPutStream(dataServers, hash, size)
- if e != nil {
- return nil, e
- }
- uuids := make([]string, ALL_SHARDS)
- for i := range uuids {
- uuids[i] = putStream.writers[i].(*objectstream.TempPutStream).Uuid
- }
- token := &resumableToken{name, size, hash, dataServers, uuids}
- return &RSResumablePutStream{putStream, token}, nil
-}
-
-func NewRSResumablePutStreamFromToken(token string) (*RSResumablePutStream, error) {
- b, e := base64.StdEncoding.DecodeString(token)
- if e != nil {
- return nil, e
- }
- var t resumableToken
- e = json.Unmarshal(b, &t)
- if e != nil {
- return nil, e
- }
- writers := make([]io.Writer, ALL_SHARDS)
- for i := range writers {
- writers[i] = &objectstream.TempPutStream{Server: t.Servers[i], Uuid: t.Uuids[i]}
- }
- enc := NewEncoder(writers)
- return &RSResumablePutStream{&RSPutStream{enc}, &t}, nil
-}
-
-func (s *RSResumablePutStream) ToToken() string {
- b, _ := json.Marshal(s)
- return base64.StdEncoding.EncodeToString(b)
-}
-
-func (s *RSResumablePutStream) CurrentSize() int64 {
- r, e := http.Head(fmt.Sprintf("http://%s/temp/%s", s.Servers[0], s.Uuids[0]))
- if e != nil {
- log.Println(e)
- return -1
- }
- if r.StatusCode != http.StatusOK {
- log.Println(r.StatusCode)
- return -1
- }
- size := utils.GetSizeFromHeader(r.Header) * DATA_SHARDS
- if size > s.Size {
- size = s.Size
- }
- return size
-}
diff --git a/src/lib/types/types.go b/src/lib/types/types.go
deleted file mode 100644
index b89c9ba..0000000
--- a/src/lib/types/types.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package types
-
-type LocateMessage struct {
- Addr string
- Id int
-}
diff --git a/src/lib/utils/utils.go b/src/lib/utils/utils.go
deleted file mode 100644
index 4342a37..0000000
--- a/src/lib/utils/utils.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package utils
-
-import (
- "crypto/sha256"
- "encoding/base64"
- "io"
- "net/http"
- "strconv"
- "strings"
-)
-
-func GetHashFromHeader(h http.Header) string {
- digest := h.Get("digest")
- if len(digest) < 9 {
- return ""
- }
- if digest[:8] != "SHA-256=" {
- return ""
- }
- return digest[8:]
-}
-
-func GetSizeFromHeader(h http.Header) int64 {
- size, _ := strconv.ParseInt(h.Get("content-length"), 0, 64)
- return size
-}
-
-func CalculateHash(r io.Reader) string {
- h := sha256.New()
- io.Copy(h, r)
- return base64.StdEncoding.EncodeToString(h.Sum(nil))
-}
-
-func GetOffsetFromHeader(h http.Header) int64 {
- byteRange := h.Get("range")
- if len(byteRange) < 7 {
- return 0
- }
- if byteRange[0:6] != "bytes=" {
- return 0
- }
- bytePos := strings.Split(byteRange[6:], "-")
- offset, _ := strconv.ParseInt(bytePos[0], 0, 64)
- return offset
-}
diff --git a/starttestenv.sh b/starttestenv.sh
deleted file mode 100644
index b2ea801..0000000
--- a/starttestenv.sh
+++ /dev/null
@@ -1,12 +0,0 @@
-#!/bin/bash
-export RABBITMQ_SERVER=amqp://test:test@localhost:5672
-export ES_SERVER=localhost:9200
-LISTEN_ADDRESS=192.168.246.131:12341 STORAGE_ROOT=/tmp/1 go run ./dataServer/dataServer.go &
-LISTEN_ADDRESS=192.168.246.131:12342 STORAGE_ROOT=/tmp/2 go run ./dataServer/dataServer.go &
-LISTEN_ADDRESS=192.168.246.131:12343 STORAGE_ROOT=/tmp/3 go run ./dataServer/dataServer.go &
-LISTEN_ADDRESS=192.168.246.131:12344 STORAGE_ROOT=/tmp/4 go run ./dataServer/dataServer.go &
-LISTEN_ADDRESS=192.168.246.131:12345 STORAGE_ROOT=/tmp/5 go run ./dataServer/dataServer.go &
-LISTEN_ADDRESS=192.168.246.131:12346 STORAGE_ROOT=/tmp/6 go run ./dataServer/dataServer.go &
-
-LISTEN_ADDRESS=192.168.246.131:12347 go run ./apiServer/apiServer.go &
-LISTEN_ADDRESS=192.168.246.131:12348 go run ./apiServer/apiServer.go &
diff --git a/stoptestenv.sh b/stoptestenv.sh
deleted file mode 100755
index 4a131b3..0000000
--- a/stoptestenv.sh
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/bash
-
-killall apiServer
-killall dataServer
diff --git a/test/put.sh b/test/put.sh
deleted file mode 100755
index 09a3275..0000000
--- a/test/put.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-
-curl -v 192.168.246.129:12347/objects/test3 -XPUT -d"this is object test3"
-
-echo -n "this is object test3" | openssl dgst -sha256 -binary | base64
-curl -v 192.168.246.129:12347/objects/test3 -XPUT -d"this is object test3" -H "Digest: SHA-256=GYqqAdFPt+CScnUDc0/Gcu3kwcWmOADKNYpiZtdbgsM="
-
-curl 192.168.246.129:12347/objects/test3
-echo
-
-echo -n "this is object test3 version 2" | openssl dgst -sha256 -binary | base64
-curl -v 192.168.246.129:12347/objects/test3 -XPUT -d"this is object test3 version 2" -H "Digest: SHA-256=cAPvsxZe1PR54zIESQy0BaxC1pYJIvaHSF3qEOZYYIo="
-
-curl 192.168.246.129:12347/objects/test3
-echo
-
-curl 192.168.246.129:12347/objects/test3
-echo
-curl 192.168.246.129:12347/locate/GYqqAdFPt+CScnUDc0%2FGcu3kwcWmOADKNYpiZtdbgsM=
-echo
-curl 192.168.246.129:12347/locate/cAPvsxZe1PR54zIESQy0BaxC1pYJIvaHSF3qEOZYYIo=
-echo
-curl 192.168.246.129:12347/versions/test3
-echo
-curl 192.168.246.129:12347/objects/test3?version=1
-echo
-curl -v 192.168.246.129:12347/objects/test3 -XDELETE
-
-curl -v 192.168.246.129:12347/objects/test3
-echo
-
-curl 192.168.246.129:12347/versions/test3
-echo
-curl 192.168.246.129:12347/objects/test3?version=1
-echo
-curl 192.168.246.129:12347/objects/test3?version=2
-echo
diff --git a/test/putmapping.sh b/test/putmapping.sh
deleted file mode 100755
index 67aad5d..0000000
--- a/test/putmapping.sh
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/bin/bash
-
-
-#curl "localhost:9200/metadata" -XDELETE
-
-curl -H "Content-Type: application/json" "localhost:9200/metadata" -XPUT -d'{"mappings":{"objects":{"properties":{"name":{"type":"text","index":"true","fielddata": "true"},"version":{"type":"integer"},"size":{"type":"integer"},"hash":{"type":"text"}}}}}'
-
diff --git a/test/test4.sh b/test/test4.sh
deleted file mode 100644
index c27a6df..0000000
--- a/test/test4.sh
+++ /dev/null
@@ -1,19 +0,0 @@
-echo -n "this object will have only 1 instance" | openssl dgst -sha256 -binary | base64
-
-curl -v 192.168.246.130:12347/objects/test4_1 -XPUT -d "this object will have only 1 instance" -H "Digest: SHA-256=incorrecthash"
-
-curl -v 192.168.246.130:12347/objects/test4_1 -XPUT -d "this object will have only 1 instance" -H "Digest: SHA-256=aWKQ2BipX94sb+h3xdTbWYAu1yzjn5vyFG2SOwUQIXY="
-
-curl -v 192.168.246.130:12347/objects/test4_2 -XPUT -d "this object will have only 1 instance" -H "Digest: SHA-256=aWKQ2BipX94sb+h3xdTbWYAu1yzjn5vyFG2SOwUQIXY="
-
-ls -ltr /tmp/?/objects
-echo
-curl 192.168.246.130:12347/objects/test4_1
-echo
-curl 192.168.246.130:12347/objects/test4_2
-echo
-curl 192.168.246.130:12347/locate/aWKQ2BipX94sb+h3xdTbWYAu1yzjn5vyFG2SOwUQIXY=
-echo
-curl 192.168.246.130:12347/versions/test4_1
-echo
-curl 192.168.246.130:12347/versions/test4_2
\ No newline at end of file
diff --git a/test/test6.sh b/test/test6.sh
deleted file mode 100644
index a688a44..0000000
--- a/test/test6.sh
+++ /dev/null
@@ -1,35 +0,0 @@
-#!/bin/bash
-
-dd if=/dev/urandom of=/tmp/file bs=1000 count=100
-
-openssl dgst -sha256 -binary /tmp/file | base64
-
-dd if=/tmp/file of=/tmp/first bs=1000 count=50
-
-dd if=/tmp/file of=/tmp/second bs=1000 skip=32 count=68
-
-
-
-
-curl -v 192.168.246.131:12347/objects/test6 -XPOST -H "Digest: SHA-256=$1" -H "Size: 100000"
-
-
-curl -I 192.168.246.131:12347/$1
-
-curl -v -XPUT --data-binary @/tmp/first 192.168.246.131:12347/$1
-
-curl -I 192.168.246.131:12347/$1
-
-curl -v -XPUT --data-binary @/tmp/second -H "range: bytes=32000-" 192.168.246.131:12347/$1
-
-curl -I 192.168.246.131:12347/$1
-
-curl -v -XPUT --data-binary @/tmp/last -H "range: bytes=96000-" 192.168.246.131:12347/$1
-
-curl 192.168.246.131:12347/objects/test6 > /tmp/output
-
-diff -s /tmp/output /tmp/file
-
-curl 192.168.246.131:12347/objects/test6 -H "range: bytes=32000-" > /tmp/output2
-
-diff -s /tmp/output2 /tmp/second
\ No newline at end of file
diff --git a/test/test6/test6_1.sh b/test/test6/test6_1.sh
deleted file mode 100644
index 9058632..0000000
--- a/test/test6/test6_1.sh
+++ /dev/null
@@ -1,8 +0,0 @@
-#!/bin/bash
-
-dd if=/dev/urandom of=/tmp/file bs=1000 count=100
-
-base = $(base64 <<< openssl dgst -sha256 -binary <<< ($cat /tmp/file))
-
-echo $base
-curl -v 192.168.246.131:12347/objects/test6 -XPOST -H "Digest: SHA-256=${hash}" -H "Size: 100000"
\ No newline at end of file
diff --git a/test/test7/genFile.sh b/test/test7/genFile.sh
deleted file mode 100644
index 0ec9b12..0000000
--- a/test/test7/genFile.sh
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/bin/bash
-
-dd if=/dev/zero of=/tmp/file bs=1M count=100
-
-openssl dgst -sha256 -binary /tmp/file | base64
\ No newline at end of file
diff --git a/test/test7/put.sh b/test/test7/put.sh
deleted file mode 100644
index 706c8f2..0000000
--- a/test/test7/put.sh
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/bin/bash
-
-curl -v 192.168.246.131:12347/objects/test7 -XPUT --data-binary @/tmp/file -H "Digest: SHA-256=IEkqTQ2E+L6xdn9mFiKfhdRMKCe2S9v7Jg7hL6EQng4="
-
-curl -v 192.168.246.131:12347/objects/test7 -o /tmp/output
-
-diff -s /tmp/output /tmp/file
-
-ls -ltr /tmp/?/objects
-
-curl -v 192.168.246.131:12347/objects/test7 -H "Accept-Encoding: gzip" -o /tmp/output2.gz
-
-gunzip /tmp/output2.gz
-
-diff -s /tmp/output2 /tmp/file
\ No newline at end of file
diff --git a/test/test8/createGarbage.sh b/test/test8/createGarbage.sh
deleted file mode 100644
index b55464d..0000000
--- a/test/test8/createGarbage.sh
+++ /dev/null
@@ -1,4 +0,0 @@
-for i in `seq 1 6`
-do
- mkdir -p /tmp/$i/garbage
-done
\ No newline at end of file
diff --git a/test/test8/push.sh b/test/test8/push.sh
deleted file mode 100644
index 17f20f3..0000000
--- a/test/test8/push.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-
-export RABBITMQ_SERVER=amqp://test:test@192.168.246.131:5672
-export ES_SERVER=192.168.246.131:9200
-
-echo -n "this is object test8 version 1" | openssl dgst -sha256 -binary | base64
-curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 1" -H "Digest: SHA-256=2IJQkIth94IVsnPQMrsNxz1oqfrsPo0E2ZmZfJLDZnE="
-
-echo -n "this is object test8 version 2-6" | openssl dgst -sha256 -binary | base64
-curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 2-6" -H "Digest: SHA-256=66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA="
-curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 2-6" -H "Digest: SHA-256=66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA="
-curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 2-6" -H "Digest: SHA-256=66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA="
-curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 2-6" -H "Digest: SHA-256=66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA="
-curl 192.168.246.131:12347/objects/test8 -XPUT -d"this is object test8 version 2-6" -H "Digest: SHA-256=66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA="
-
-curl 192.168.246.131:12347/versions/test8
-curl 192.168.246.131:12347/objects/test8
-ls -l /tmp/?/objects
-
-go run ../deleteOldMetadata/deleteOldMetadata.go
-curl 192.168.246.131:12347/versions/test8
-
-STORAGE_ROOT=/tmp/1 LISTEN_ADDRESS=10.29.1.1:12345 go run ../deleteOrphanObject/deleteOrphanObject.go
-STORAGE_ROOT=/tmp/2 LISTEN_ADDRESS=10.29.1.2:12345 go run ../deleteOrphanObject/deleteOrphanObject.go
-STORAGE_ROOT=/tmp/3 LISTEN_ADDRESS=10.29.1.3:12345 go run ../deleteOrphanObject/deleteOrphanObject.go
-STORAGE_ROOT=/tmp/4 LISTEN_ADDRESS=10.29.1.4:12345 go run ../deleteOrphanObject/deleteOrphanObject.go
-STORAGE_ROOT=/tmp/5 LISTEN_ADDRESS=10.29.1.5:12345 go run ../deleteOrphanObject/deleteOrphanObject.go
-STORAGE_ROOT=/tmp/6 LISTEN_ADDRESS=10.29.1.6:12345 go run ../deleteOrphanObject/deleteOrphanObject.go
-ls -l /tmp/?/objects
-ls -l /tmp/?/garbage
-
-rm /tmp/1/objects/66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA=.*
-echo some_garbage > /tmp/2/objects/66WuRH0s0albWDZ9nTmjFo9JIqTTXmB6EiRkhTh1zeA=.*
-ls -l /tmp/?/objects
-
-STORAGE_ROOT=/tmp/2 go run ../objectScanner/objectScanner.go
-ls -l /tmp/?/objects
diff --git a/test/tset5.sh b/test/tset5.sh
deleted file mode 100644
index 0756bb2..0000000
--- a/test/tset5.sh
+++ /dev/null
@@ -1,19 +0,0 @@
-#!/bin/bash
-
-echo -n "this object will be separate to 4+2 shards" | openssl dgst -sha256 -binary | base64
-
-curl -v 192.168.246.131:12347/objects/test5 -XPUT -d "this object will be separate to 4+2 shards" -H "Digest: SHA-256=MBMxWHrPMsuOBaVYHkwScZQRyTRMQyiKp2oelpLZza8="
-
-ls -ltr /tmp/?/objects
-echo
-curl 192.168.246.131:12347/objects/test5
-echo
-curl 192.168.246.131:12347/locate/MBMxWHrPMsuOBaVYHkwScZQRyTRMQyiKp2oelpLZza8=
-
-rm /tmp/1/objects/MBMxWHrPMsuOBaVYHkwScZQRyTRMQyiKp2oelpLZza8=.*
-echo some_garbage > /tmp/2/objects/MBMxWHrPMsuOBaVYHkwScZQRyTRMQyiKp2oelpLZza8=.*
-ls -ltr /tmp/?/objects
-echo
-curl 192.168.246.131:12347/objects/test5
-echo
-ls -ltr /tmp/?/objects