Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Dec 20, 2024. It is now read-only.

Commit 122ac3c

Browse filesBrowse files
committed
refactor: refactor dfget/core with SupernodeLocator
Signed-off-by: lowzj <zj3142063@gmail.com>
1 parent 8a1b37a commit 122ac3c
Copy full SHA for 122ac3c

File tree

Expand file treeCollapse file tree

11 files changed

+280
-125
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

11 files changed

+280
-125
lines changed
Open diff view settings
Collapse file

‎cmd/dfget/app/root_test.go‎

Copy file name to clipboardExpand all lines: cmd/dfget/app/root_test.go
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type dfgetSuit struct {
3838

3939
func (suit *dfgetSuit) Test_initFlagsNoArguments() {
4040
initProperties()
41-
suit.Equal(cfg.Nodes, []string{"127.0.0.1:8002"})
41+
suit.Equal(cfg.Nodes, []string(nil))
4242
suit.Equal(cfg.LocalLimit, 20*rate.MB)
4343
suit.Equal(cfg.TotalLimit, rate.Rate(0))
4444
suit.Equal(cfg.Notbs, false)
Collapse file

‎dfget/config/config.go‎

Copy file name to clipboardExpand all lines: dfget/config/config.go
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,9 @@ type Properties struct {
9191

9292
// NewProperties creates a new properties with default values.
9393
func NewProperties() *Properties {
94+
// don't set Supernodes as default value, the SupernodeLocator will
95+
// do this in a better way.
9496
return &Properties{
95-
Supernodes: GetDefaultSupernodesValue(),
9697
LocalLimit: DefaultLocalLimit,
9798
MinRate: DefaultMinRate,
9899
ClientQueueSize: DefaultClientQueueSize,
Collapse file

‎dfget/core/core.go‎

Copy file name to clipboardExpand all lines: dfget/core/core.go
+36-24Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
p2pDown "github.com/dragonflyoss/Dragonfly/dfget/core/downloader/p2p_downloader"
3535
"github.com/dragonflyoss/Dragonfly/dfget/core/regist"
3636
"github.com/dragonflyoss/Dragonfly/dfget/core/uploader"
37+
"github.com/dragonflyoss/Dragonfly/dfget/locator"
3738
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"
3839
"github.com/dragonflyoss/Dragonfly/pkg/constants"
3940
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
@@ -50,32 +51,33 @@ import (
5051
// Start function creates a new task and starts it to download file.
5152
func Start(cfg *config.Config) *errortypes.DfError {
5253
var (
53-
supernodeAPI = api.NewSupernodeAPI()
54-
register = regist.NewSupernodeRegister(cfg, supernodeAPI)
55-
err error
56-
result *regist.RegisterResult
54+
supernodeAPI = api.NewSupernodeAPI()
55+
supernodeLocator = locator.CreateLocator(cfg)
56+
register = regist.NewSupernodeRegister(cfg, supernodeAPI, supernodeLocator)
57+
err error
58+
result *regist.RegisterResult
5759
)
5860

5961
printer.Println(fmt.Sprintf("--%s-- %s",
6062
cfg.StartTime.Format(config.DefaultTimestampFormat), cfg.URL))
6163

62-
if err = prepare(cfg); err != nil {
64+
if err = prepare(cfg, supernodeLocator); err != nil {
6365
return errortypes.New(config.CodePrepareError, err.Error())
6466
}
6567

66-
if result, err = registerToSuperNode(cfg, register); err != nil {
68+
if result, err = registerToSuperNode(cfg, register, supernodeLocator); err != nil {
6769
return errortypes.New(config.CodeRegisterError, err.Error())
6870
}
6971

70-
if err = downloadFile(cfg, supernodeAPI, register, result); err != nil {
72+
if err = downloadFile(cfg, supernodeAPI, supernodeLocator, register, result); err != nil {
7173
return errortypes.New(config.CodeDownloadError, err.Error())
7274
}
7375

7476
return nil
7577
}
7678

7779
// prepare the RV-related information and create the corresponding files.
78-
func prepare(cfg *config.Config) (err error) {
80+
func prepare(cfg *config.Config, locator locator.SupernodeLocator) (err error) {
7981
printer.Printf("dfget version:%s", version.DFGetVersion)
8082
printer.Printf("workspace:%s", cfg.WorkHome)
8183
printer.Printf("sign:%s", cfg.Sign)
@@ -104,9 +106,8 @@ func prepare(cfg *config.Config) (err error) {
104106
}
105107
rv.DataDir = cfg.RV.SystemDataDir
106108

107-
cfg.Nodes = adjustSupernodeList(cfg.Nodes)
108109
if stringutils.IsEmptyStr(rv.LocalIP) {
109-
rv.LocalIP = checkConnectSupernode(cfg.Nodes)
110+
rv.LocalIP = checkConnectSupernode(locator)
110111
}
111112
rv.Cid = getCid(rv.LocalIP, cfg.Sign)
112113
rv.TaskFileName = getTaskFileName(rv.RealTarget, cfg.Sign)
@@ -124,7 +125,7 @@ func launchPeerServer(cfg *config.Config) error {
124125
return err
125126
}
126127

127-
func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister) (
128+
func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister, supernodeLocator locator.SupernodeLocator) (
128129
*regist.RegisterResult, error) {
129130
defer func() {
130131
if r := recover(); r != nil {
@@ -137,7 +138,7 @@ func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister)
137138
panic("user specified")
138139
}
139140

140-
if len(cfg.Nodes) == 0 {
141+
if supernodeLocator == nil || supernodeLocator.Size() == 0 {
141142
cfg.BackSourceReason = config.BackSourceReasonNodeEmpty
142143
panic("supernode empty")
143144
}
@@ -162,7 +163,7 @@ func registerToSuperNode(cfg *config.Config, register regist.SupernodeRegister)
162163
return result, nil
163164
}
164165

165-
func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI,
166+
func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI, locator locator.SupernodeLocator,
166167
register regist.SupernodeRegister, result *regist.RegisterResult) error {
167168
timeout := calculateTimeout(cfg)
168169

@@ -180,7 +181,7 @@ func downloadFile(cfg *config.Config, supernodeAPI api.SupernodeAPI,
180181
downloadTime := time.Since(cfg.StartTime).Seconds()
181182
// upload metrics to supernode only if pattern is p2p or cdn and result is not nil
182183
if cfg.Pattern != config.PatternSource && result != nil {
183-
reportMetrics(cfg, supernodeAPI, downloadTime, result.TaskID, success)
184+
reportMetrics(cfg, supernodeAPI, locator, downloadTime, result.TaskID, success)
184185
}
185186

186187
if success {
@@ -285,21 +286,26 @@ func adjustSupernodeList(nodes []string) []string {
285286
}
286287
}
287288

288-
func checkConnectSupernode(nodes []string) (localIP string) {
289+
func checkConnectSupernode(locator locator.SupernodeLocator) (localIP string) {
289290
var (
290291
e error
291292
)
292-
for _, n := range nodes {
293-
ip, port := netutils.GetIPAndPortFromNode(n, config.DefaultSupernodePort)
294-
if localIP, e = httputils.CheckConnect(ip, port, 1000); e == nil {
295-
return localIP
293+
if locator == nil {
294+
return ""
295+
}
296+
for _, group := range locator.All() {
297+
for _, n := range group.Nodes {
298+
if localIP, e = httputils.CheckConnect(n.IP, n.Port, 1000); e == nil {
299+
return localIP
300+
}
301+
logrus.Errorf("Connect to node:%s error: %v", n, e)
296302
}
297-
logrus.Errorf("Connect to node:%s error: %v", n, e)
298303
}
299304
return ""
300305
}
301306

302-
func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, downloadTime float64, taskID string, success bool) {
307+
func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, locator locator.SupernodeLocator,
308+
downloadTime float64, taskID string, success bool) {
303309
req := &types.TaskMetricsRequest{
304310
BacksourceReason: strconv.Itoa(cfg.BackSourceReason),
305311
IP: cfg.RV.LocalIP,
@@ -311,10 +317,16 @@ func reportMetrics(cfg *config.Config, supernodeAPI api.SupernodeAPI, downloadTi
311317
Success: success,
312318
TaskID: taskID,
313319
}
314-
for _, node := range cfg.Nodes {
315-
resp, err := supernodeAPI.ReportMetrics(node, req)
320+
node := locator.Get()
321+
if node == nil {
322+
return
323+
}
324+
nodeStr := node.String()
325+
// retry twice
326+
for i := 0; i < 2; i++ {
327+
resp, err := supernodeAPI.ReportMetrics(nodeStr, req)
316328
if err != nil {
317-
logrus.Errorf("failed to report metrics to supernode %s: %v", node, err)
329+
logrus.Errorf("failed to report metrics to supernode %s: %v", nodeStr, err)
318330
}
319331
if resp != nil && resp.IsSuccess() {
320332
return
Collapse file

‎dfget/core/core_test.go‎

Copy file name to clipboardExpand all lines: dfget/core/core_test.go
+15-7Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
. "github.com/dragonflyoss/Dragonfly/dfget/core/helper"
3434
"github.com/dragonflyoss/Dragonfly/dfget/core/regist"
3535
"github.com/dragonflyoss/Dragonfly/dfget/core/uploader"
36+
"github.com/dragonflyoss/Dragonfly/dfget/locator"
3637
"github.com/dragonflyoss/Dragonfly/pkg/algorithm"
3738

3839
"github.com/go-check/check"
@@ -68,18 +69,20 @@ func (s *CoreTestSuite) TestPrepare(c *check.C) {
6869
cfg := s.createConfig(buf)
6970
cfg.Output = filepath.Join(s.workHome, "test.output")
7071

71-
err := prepare(cfg)
72+
err := prepare(cfg, nil)
7273
fmt.Printf("%s\nerror:%v", buf.String(), err)
7374
}
7475

7576
func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) {
7677
cfg := s.createConfig(&bytes.Buffer{})
7778
m := new(MockSupernodeAPI)
7879
m.RegisterFunc = CreateRegisterFunc()
79-
register := regist.NewSupernodeRegister(cfg, m)
80+
nodeStr := "127.0.0.1:8002"
81+
snLocator, _ := locator.NewStaticLocatorFromStr("test", []string{nodeStr})
82+
register := regist.NewSupernodeRegister(cfg, m, snLocator)
8083

8184
var f = func(bc int, errIsNil bool, data *regist.RegisterResult) {
82-
res, e := registerToSuperNode(cfg, register)
85+
res, e := registerToSuperNode(cfg, register, snLocator)
8386
c.Assert(res == nil, check.Equals, data == nil)
8487
c.Assert(e == nil, check.Equals, errIsNil)
8588
c.Assert(cfg.BackSourceReason, check.Equals, bc)
@@ -88,18 +91,21 @@ func (s *CoreTestSuite) TestRegisterToSupernode(c *check.C) {
8891
}
8992
}
9093

94+
tmpGroup := snLocator.Group
95+
snLocator.Group = nil
9196
f(config.BackSourceReasonNodeEmpty, true, nil)
9297

9398
cfg.Pattern = config.PatternSource
9499
f(config.BackSourceReasonUserSpecified, true, nil)
95100

96101
uploader.SetupPeerServerExecutor(nil)
97102
cfg.Pattern = config.PatternP2P
98-
cfg.Nodes = []string{"x"}
103+
snLocator.Group = tmpGroup
104+
snLocator.Refresh()
99105
cfg.URL = "http://x.com"
100106
f(config.BackSourceReasonRegisterFail, true, nil)
101107

102-
cfg.Nodes = []string{"x"}
108+
snLocator.Refresh()
103109
cfg.URL = "http://taobao.com"
104110
cfg.BackSourceReason = config.BackSourceReasonNone
105111
}
@@ -131,11 +137,13 @@ func (s *CoreTestSuite) TestCheckConnectSupernode(c *check.C) {
131137
s.createConfig(buf)
132138

133139
nodes := []string{host}
134-
ip := checkConnectSupernode(nodes)
140+
l, _ := locator.NewStaticLocatorFromStr("test", nodes)
141+
ip := checkConnectSupernode(l)
135142
c.Assert(ip, check.Equals, "127.0.0.1")
136143

137144
buf.Reset()
138-
ip = checkConnectSupernode([]string{"127.0.0.2"})
145+
l, _ = locator.NewStaticLocatorFromStr("test", []string{"127.0.0.2"})
146+
ip = checkConnectSupernode(l)
139147
c.Assert(strings.Index(buf.String(), "Connect") > 0, check.Equals, true)
140148
c.Assert(ip, check.Equals, "")
141149
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.