Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b118a4d

Browse filesBrowse files
committed
Merge branch '133-sync-port-allocation' into 'master'
fix: sync clone port allocation(#133) Closes #133 See merge request postgres-ai/database-lab!120
2 parents 093342a + 9c08c48 commit b118a4d
Copy full SHA for b118a4d

File tree

2 files changed

+94
-18
lines changed
Filter options

2 files changed

+94
-18
lines changed

‎pkg/services/provision/mode_local.go

Copy file name to clipboardExpand all lines: pkg/services/provision/mode_local.go
+38-18Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"path"
1212
"strconv"
1313
"strings"
14+
"sync"
15+
"sync/atomic"
1416
"time"
1517

1618
"github.com/docker/docker/api/types"
@@ -72,17 +74,18 @@ type provisionModeLocal struct {
7274
provision
7375
dockerClient *client.Client
7476
runner runners.Runner
77+
mu *sync.Mutex
7578
ports []bool
76-
sessionCounter uint
79+
sessionCounter uint32
7780
thinCloneManager thinclones.Manager
7881
}
7982

8083
// NewProvisionModeLocal creates a new Provision instance of ModeLocal.
8184
func NewProvisionModeLocal(ctx context.Context, config Config, dockerClient *client.Client) (Provision, error) {
8285
p := &provisionModeLocal{
83-
runner: runners.NewLocalRunner(config.ModeLocal.UseSudo),
84-
sessionCounter: 0,
85-
dockerClient: dockerClient,
86+
runner: runners.NewLocalRunner(config.ModeLocal.UseSudo),
87+
mu: &sync.Mutex{},
88+
dockerClient: dockerClient,
8689
provision: provision{
8790
config: config,
8891
ctx: ctx,
@@ -196,8 +199,7 @@ func (j *provisionModeLocal) StartSession(username, password, snapshotID string)
196199
return nil, errors.Wrap(err, "failed to get snapshots")
197200
}
198201

199-
// TODO(anatoly): Synchronization or port allocation statuses.
200-
port, err := j.getFreePort()
202+
port, err := j.allocatePort()
201203
if err != nil {
202204
return nil, errors.New("failed to get a free port")
203205
}
@@ -209,6 +211,10 @@ func (j *provisionModeLocal) StartSession(username, password, snapshotID string)
209211
defer func() {
210212
if err != nil {
211213
j.revertSession(name)
214+
215+
if portErr := j.freePort(port); portErr != nil {
216+
log.Err(portErr)
217+
}
212218
}
213219
}()
214220

@@ -227,12 +233,7 @@ func (j *provisionModeLocal) StartSession(username, password, snapshotID string)
227233
return nil, errors.Wrap(err, "failed to prepare a database")
228234
}
229235

230-
err = j.setPort(port, true)
231-
if err != nil {
232-
return nil, errors.Wrap(err, "failed to set a port")
233-
}
234-
235-
j.sessionCounter++
236+
atomic.AddUint32(&j.sessionCounter, 1)
236237

237238
appConfig := j.getAppConfig(name, port)
238239

@@ -263,7 +264,7 @@ func (j *provisionModeLocal) StopSession(session *resources.Session) error {
263264
return errors.Wrap(err, "failed to destroy a clone")
264265
}
265266

266-
err = j.setPort(session.Port, false)
267+
err = j.freePort(session.Port)
267268
if err != nil {
268269
return errors.Wrap(err, "failed to unbind a port")
269270
}
@@ -372,20 +373,39 @@ func (j *provisionModeLocal) initPortPool() error {
372373
return nil
373374
}
374375

375-
func (j *provisionModeLocal) getFreePort() (uint, error) {
376+
// allocatePort tries to find a free port and occupy it.
377+
func (j *provisionModeLocal) allocatePort() (uint, error) {
376378
portOpts := j.config.ModeLocal.PortPool
377379

380+
j.mu.Lock()
381+
defer j.mu.Unlock()
382+
378383
for index, binded := range j.ports {
379384
if !binded {
380385
port := portOpts.From + uint(index)
386+
387+
if err := j.setPortStatus(port, true); err != nil {
388+
return 0, errors.Wrapf(err, "failed to set status for port %v", port)
389+
}
390+
381391
return port, nil
382392
}
383393
}
384394

385395
return 0, errors.WithStack(NewNoRoomError("no available ports"))
386396
}
387397

388-
func (j *provisionModeLocal) setPort(port uint, bind bool) error {
398+
// freePort marks the port as free.
399+
func (j *provisionModeLocal) freePort(port uint) error {
400+
j.mu.Lock()
401+
defer j.mu.Unlock()
402+
403+
return j.setPortStatus(port, false)
404+
}
405+
406+
// setPortStatus updates the port status.
407+
// It's not safe to invoke without ports mutex locking. Use allocatePort and freePort methods.
408+
func (j *provisionModeLocal) setPortStatus(port uint, bind bool) error {
389409
portOpts := j.config.ModeLocal.PortPool
390410

391411
if port < portOpts.From || port >= portOpts.To {
@@ -399,14 +419,14 @@ func (j *provisionModeLocal) setPort(port uint, bind bool) error {
399419
}
400420

401421
func (j *provisionModeLocal) stopAllSessions() error {
402-
insts, err := postgres.List(j.runner, ClonePrefix)
422+
instances, err := postgres.List(j.runner, ClonePrefix)
403423
if err != nil {
404424
return errors.Wrap(err, "failed to list containers")
405425
}
406426

407-
log.Dbg("Containers running:", insts)
427+
log.Dbg("Containers running:", instances)
408428

409-
for _, inst := range insts {
429+
for _, inst := range instances {
410430
log.Dbg("Stopping container:", inst)
411431

412432
if err = postgres.Stop(j.runner, j.getAppConfig(inst, 0)); err != nil {
+56Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package provision
2+
3+
import (
4+
"sync"
5+
"testing"
6+
7+
"github.com/pkg/errors"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestPortAllocation(t *testing.T) {
13+
p := &provisionModeLocal{
14+
mu: &sync.Mutex{},
15+
provision: provision{
16+
config: Config{
17+
ModeLocal: ModeLocalConfig{
18+
PortPool: ModeLocalPortPool{
19+
From: 6000,
20+
To: 6002,
21+
},
22+
},
23+
},
24+
},
25+
}
26+
27+
// Initialize port pool.
28+
require.NoError(t, p.initPortPool())
29+
30+
// Allocate a new port.
31+
port, err := p.allocatePort()
32+
require.NoError(t, err)
33+
34+
assert.GreaterOrEqual(t, port, p.provision.config.ModeLocal.PortPool.From)
35+
assert.LessOrEqual(t, port, p.provision.config.ModeLocal.PortPool.To)
36+
37+
// Allocate one more port.
38+
_, err = p.allocatePort()
39+
require.NoError(t, err)
40+
41+
// Impossible allocate a new port.
42+
_, err = p.allocatePort()
43+
assert.IsType(t, errors.Cause(err), &NoRoomError{})
44+
assert.EqualError(t, err, "session cannot be started because there is no room: no available ports")
45+
46+
// Free port and allocate a new one.
47+
require.NoError(t, p.freePort(port))
48+
port, err = p.allocatePort()
49+
require.NoError(t, err)
50+
assert.GreaterOrEqual(t, port, p.provision.config.ModeLocal.PortPool.From)
51+
assert.LessOrEqual(t, port, p.provision.config.ModeLocal.PortPool.To)
52+
53+
// Try to free a non-existing port.
54+
err = p.freePort(1)
55+
assert.EqualError(t, err, "port 1 is out of bounds of the port pool")
56+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.