Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 99b743a

Browse filesBrowse files
committed
feat: run sync instance in order to keep data from physical backup up-to-date (#145)
1 parent 039cde8 commit 99b743a
Copy full SHA for 99b743a

File tree

Expand file treeCollapse file tree

9 files changed

+193
-83
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

9 files changed

+193
-83
lines changed
Open diff view settings
Collapse file

‎cmd/database-lab/main.go‎

Copy file name to clipboardExpand all lines: cmd/database-lab/main.go
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/docker/docker/client"
1818
"github.com/jessevdk/go-flags"
1919
"github.com/pkg/errors"
20+
"github.com/rs/xid"
2021

2122
"gitlab.com/postgres-ai/database-lab/pkg/config"
2223
"gitlab.com/postgres-ai/database-lab/pkg/log"
@@ -72,6 +73,8 @@ func main() {
7273
cfg.Global.MountDir = cfg.Provision.ModeLocal.MountDir
7374
}
7475

76+
cfg.Global.InstanceID = xid.New().String()
77+
7578
ctx, cancel := context.WithCancel(context.Background())
7679
defer cancel()
7780

Collapse file

‎configs/config.example.physical_generic.yml‎

Copy file name to clipboardExpand all lines: configs/config.example.physical_generic.yml
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ retrieval:
158158
options:
159159
tool: customTool
160160
dockerImage: "postgresai/sync-instance:12"
161+
syncInstance: true
161162

162163
# Set environment variables here. See https://www.postgresql.org/docs/current/libpq-envars.html
163164
envs:
Collapse file

‎configs/config.example.physical_walg.yml‎

Copy file name to clipboardExpand all lines: configs/config.example.physical_walg.yml
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ retrieval:
158158
options:
159159
tool: walg
160160
dockerImage: "postgresai/sync-instance:12"
161+
syncInstance: true
161162
envs:
162163
WALG_GS_PREFIX: "gs://{BUCKET}/{SCOPE}"
163164
walg:
Collapse file

‎pkg/config/config.go‎

Copy file name to clipboardExpand all lines: pkg/config/config.go
+4-3Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ type Config struct {
3434

3535
// Global contains global Database Lab configurations.
3636
type Global struct {
37-
Engine string `yaml:"engine"`
38-
DataDir string `yaml:"dataDir"`
39-
MountDir string // TODO (akartasov): Use MountDir for the ModeLocalConfig of a Provision service.
37+
InstanceID string
38+
Engine string `yaml:"engine"`
39+
DataDir string `yaml:"dataDir"`
40+
MountDir string // TODO (akartasov): Use MountDir for the ModeLocalConfig of a Provision service.
4041
}
4142

4243
// LoadConfig instances a new Config by configuration filename.
Collapse file

‎pkg/retrieval/engine/postgres/initialize/physical/custom.go‎

Copy file name to clipboardExpand all lines: pkg/retrieval/engine/postgres/initialize/physical/custom.go
+12-5Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
package physical
66

77
import (
8-
"strings"
8+
"bytes"
9+
"fmt"
910

1011
"github.com/docker/docker/api/types/mount"
1112
)
@@ -19,7 +20,8 @@ type custom struct {
1920
}
2021

2122
type customOptions struct {
22-
Command string `yaml:"command"`
23+
RestoreCommand string `yaml:"command"`
24+
RefreshCommand string `yaml:"refresh_command"`
2325
}
2426

2527
func newCustomTool(options customOptions) *custom {
@@ -39,11 +41,16 @@ func (c *custom) GetMounts() []mount.Mount {
3941
}
4042

4143
// GetRestoreCommand returns a custom command to restore data.
42-
func (c *custom) GetRestoreCommand() []string {
43-
return strings.Split(c.options.Command, " ")
44+
func (c *custom) GetRestoreCommand() string {
45+
return c.options.RestoreCommand
4446
}
4547

4648
// GetRecoveryConfig returns a recovery config to restore data.
4749
func (c *custom) GetRecoveryConfig() []byte {
48-
return []byte{}
50+
buffer := bytes.Buffer{}
51+
52+
buffer.WriteString("standby_mode = 'on'\n")
53+
buffer.WriteString(fmt.Sprintf("restore_command = '%s'\n", c.options.RefreshCommand))
54+
55+
return buffer.Bytes()
4956
}
Collapse file

‎pkg/retrieval/engine/postgres/initialize/physical/physical.go‎

Copy file name to clipboardExpand all lines: pkg/retrieval/engine/postgres/initialize/physical/physical.go
+110-50Lines changed: 110 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ const (
3939
// RestoreJobType defines the physical job type.
4040
RestoreJobType = "physical-restore"
4141

42-
restoreContainerName = "retriever_physical_restore"
43-
restoreContainerPath = "/var/lib/postgresql/dblabdata"
42+
restoreContainerPrefix = "dblab_phr_"
43+
restoreContainerPath = "/var/lib/postgresql/dblabdata"
4444

4545
readyLogLine = "database system is ready to accept"
4646

@@ -59,11 +59,12 @@ type RestoreJob struct {
5959

6060
// CopyOptions describes options for physical copying.
6161
type CopyOptions struct {
62-
Tool string `yaml:"tool"`
63-
DockerImage string `yaml:"dockerImage"`
64-
Envs map[string]string `yaml:"envs"`
65-
WALG walgOptions `yaml:"walg"`
66-
CustomTool customOptions `yaml:"customTool"`
62+
Tool string `yaml:"tool"`
63+
DockerImage string `yaml:"dockerImage"`
64+
Envs map[string]string `yaml:"envs"`
65+
WALG walgOptions `yaml:"walg"`
66+
CustomTool customOptions `yaml:"customTool"`
67+
SyncInstance bool `yaml:"syncInstance"`
6768
}
6869

6970
// restorer describes the interface of tools for physical restore.
@@ -75,7 +76,7 @@ type restorer interface {
7576
GetMounts() []mount.Mount
7677

7778
// GetRestoreCommand returns a command to restore data.
78-
GetRestoreCommand() []string
79+
GetRestoreCommand() string
7980

8081
// GetRecoveryConfig returns a recovery config to restore data.
8182
GetRecoveryConfig() []byte
@@ -117,6 +118,10 @@ func (r *RestoreJob) getRestorer(tool string) (restorer, error) {
117118
return nil, errors.Errorf("unknown restore tool given: %v", tool)
118119
}
119120

121+
func (r *RestoreJob) restoreContainerName() string {
122+
return restoreContainerPrefix + r.globalCfg.InstanceID
123+
}
124+
120125
// Name returns a name of the job.
121126
func (r *RestoreJob) Name() string {
122127
return r.name
@@ -132,49 +137,26 @@ func (r *RestoreJob) Run(ctx context.Context) error {
132137
}
133138

134139
if !isEmpty {
135-
return errors.New("the data directory is not empty. Clean the data directory before continue")
140+
return errors.New("the data directory is not empty. Clean the data directory before proceeding")
136141
}
137142

138-
cont, err := r.dockerClient.ContainerCreate(ctx,
139-
&container.Config{
140-
Env: r.getEnvironmentVariables(),
141-
Image: r.DockerImage,
142-
},
143-
&container.HostConfig{
144-
Mounts: r.getMountVolumes(),
145-
},
146-
&network.NetworkingConfig{},
147-
restoreContainerName,
148-
)
149-
143+
contID, err := r.startReplica(ctx, r.restoreContainerName())
150144
if err != nil {
151-
return errors.Wrap(err, "failed to create container")
145+
return errors.Wrapf(err, "failed to create container: %s", r.restoreContainerName())
152146
}
153147

154-
defer func() {
155-
if err := r.dockerClient.ContainerRemove(ctx, cont.ID, types.ContainerRemoveOptions{
156-
Force: true,
157-
}); err != nil {
158-
log.Err("Failed to remove container: ", err)
159-
160-
return
161-
}
162-
163-
log.Msg(fmt.Sprintf("Stop container: %s. ID: %v", restoreContainerName, cont.ID))
164-
}()
148+
defer tools.RemoveContainer(ctx, r.dockerClient, contID, tools.StopTimeout)
165149

166-
defer tools.RemoveContainer(ctx, r.dockerClient, cont.ID, tools.StopTimeout)
150+
log.Msg(fmt.Sprintf("Running container: %s. ID: %v", r.restoreContainerName(), contID))
167151

168-
log.Msg(fmt.Sprintf("Running container: %s. ID: %v", restoreContainerName, cont.ID))
169-
170-
if err = r.dockerClient.ContainerStart(ctx, cont.ID, types.ContainerStartOptions{}); err != nil {
171-
return errors.Wrap(err, "failed to start container")
152+
if err = r.dockerClient.ContainerStart(ctx, contID, types.ContainerStartOptions{}); err != nil {
153+
return errors.Wrapf(err, "failed to start container: %v", contID)
172154
}
173155

174156
log.Msg("Running restore command")
175157

176-
if err := tools.ExecCommand(ctx, r.dockerClient, cont.ID, types.ExecConfig{
177-
Cmd: r.restorer.GetRestoreCommand(),
158+
if err := tools.ExecCommand(ctx, r.dockerClient, contID, types.ExecConfig{
159+
Cmd: []string{"bash", "-c", r.restorer.GetRestoreCommand()},
178160
}); err != nil {
179161
return errors.Wrap(err, "failed to restore data")
180162
}
@@ -209,20 +191,14 @@ func (r *RestoreJob) Run(ctx context.Context) error {
209191
}
210192

211193
// Set permissions.
212-
if err := tools.ExecCommand(ctx, r.dockerClient, cont.ID, types.ExecConfig{
194+
if err := tools.ExecCommand(ctx, r.dockerClient, contID, types.ExecConfig{
213195
Cmd: []string{"chown", "-R", "postgres", restoreContainerPath},
214196
}); err != nil {
215197
return errors.Wrap(err, "failed to set permissions")
216198
}
217199

218200
// Start PostgreSQL instance.
219-
startCommand, err := r.dockerClient.ContainerExecCreate(ctx, cont.ID, types.ExecConfig{
220-
AttachStdout: true,
221-
AttachStderr: true,
222-
Tty: true,
223-
Cmd: []string{"postgres", "-D", restoreContainerPath},
224-
User: defaults.Username,
225-
})
201+
startCommand, err := r.dockerClient.ContainerExecCreate(ctx, contID, startingPostgresConfig())
226202

227203
if err != nil {
228204
return errors.Wrap(err, "failed to create an exec command")
@@ -241,11 +217,51 @@ func (r *RestoreJob) Run(ctx context.Context) error {
241217
return errors.Wrap(err, "failed to refresh data")
242218
}
243219

244-
log.Msg("Running restore command")
220+
log.Msg("Refresh command has been finished")
221+
222+
if r.CopyOptions.SyncInstance {
223+
if err := r.runSyncInstance(ctx); err != nil {
224+
log.Err("Failed to run sync instance", err)
225+
}
226+
}
245227

246228
return nil
247229
}
248230

231+
func (r *RestoreJob) startReplica(ctx context.Context, containerName string) (string, error) {
232+
syncInstance, err := r.dockerClient.ContainerCreate(ctx,
233+
&container.Config{
234+
Env: r.getEnvironmentVariables(),
235+
Image: r.DockerImage,
236+
},
237+
&container.HostConfig{
238+
Mounts: r.getMountVolumes(),
239+
},
240+
&network.NetworkingConfig{},
241+
containerName,
242+
)
243+
244+
if err != nil {
245+
return "", errors.Wrap(err, "failed to start sync container")
246+
}
247+
248+
if err = r.dockerClient.ContainerStart(ctx, syncInstance.ID, types.ContainerStartOptions{}); err != nil {
249+
return "", errors.Wrap(err, "failed to start sync container")
250+
}
251+
252+
return syncInstance.ID, nil
253+
}
254+
255+
func startingPostgresConfig() types.ExecConfig {
256+
return types.ExecConfig{
257+
AttachStdout: true,
258+
AttachStderr: true,
259+
Tty: true,
260+
Cmd: []string{"postgres", "-D", restoreContainerPath},
261+
User: defaults.Username,
262+
}
263+
}
264+
249265
func isDatabaseReady(input io.Reader) error {
250266
scanner := bufio.NewScanner(input)
251267

@@ -272,7 +288,51 @@ func isDatabaseReady(input io.Reader) error {
272288
return err
273289
}
274290

275-
return errors.New("not found")
291+
return errors.New("database instance is not running")
292+
}
293+
294+
func (r *RestoreJob) syncInstanceName() string {
295+
return tools.SyncInstanceContainerPrefix + r.globalCfg.InstanceID
296+
}
297+
298+
func (r *RestoreJob) runSyncInstance(ctx context.Context) error {
299+
syncContainer, err := r.dockerClient.ContainerInspect(ctx, r.syncInstanceName())
300+
if err != nil && !client.IsErrNotFound(err) {
301+
return errors.Wrap(err, "failed to inspect sync container")
302+
}
303+
304+
if syncContainer.ContainerJSONBase != nil {
305+
if syncContainer.State.Running {
306+
log.Msg("Sync instance is already running")
307+
return nil
308+
}
309+
310+
log.Msg("Removing non-running sync instance")
311+
312+
tools.RemoveContainer(ctx, r.dockerClient, syncContainer.ID, tools.StopTimeout)
313+
}
314+
315+
log.Msg("Starting sync instance: ", r.syncInstanceName())
316+
317+
syncInstanceID, err := r.startReplica(ctx, r.syncInstanceName())
318+
if err != nil {
319+
return err
320+
}
321+
322+
startSyncCommand, err := r.dockerClient.ContainerExecCreate(ctx, syncInstanceID, startingPostgresConfig())
323+
if err != nil {
324+
return errors.Wrap(err, "failed to create exec command")
325+
}
326+
327+
if err = r.dockerClient.ContainerExecStart(ctx, startSyncCommand.ID, types.ExecStartCheck{Tty: true}); err != nil {
328+
return errors.Wrap(err, "failed to attach to exec command")
329+
}
330+
331+
if err := tools.InspectCommandResponse(ctx, r.dockerClient, startSyncCommand.ID, startSyncCommand.ID); err != nil {
332+
return errors.Wrap(err, "failed to perform exec command")
333+
}
334+
335+
return nil
276336
}
277337

278338
func (r *RestoreJob) getEnvironmentVariables() []string {
Collapse file

‎pkg/retrieval/engine/postgres/initialize/physical/wal_g.go‎

Copy file name to clipboardExpand all lines: pkg/retrieval/engine/postgres/initialize/physical/wal_g.go
+2-4Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package physical
77
import (
88
"bytes"
99
"fmt"
10-
"time"
1110

1211
"github.com/docker/docker/api/types/mount"
1312
)
@@ -64,16 +63,15 @@ func (w *walg) GetMounts() []mount.Mount {
6463
}
6564

6665
// GetRestoreCommand returns a command to restore data.
67-
func (w *walg) GetRestoreCommand() []string {
68-
return []string{"wal-g", "backup-fetch", restoreContainerPath, w.options.BackupName}
66+
func (w *walg) GetRestoreCommand() string {
67+
return fmt.Sprintf("wal-g backup-fetch %s %s", restoreContainerPath, w.options.BackupName)
6968
}
7069

7170
// GetRecoveryConfig returns a recovery config to restore data.
7271
func (w *walg) GetRecoveryConfig() []byte {
7372
buffer := bytes.Buffer{}
7473

7574
buffer.WriteString("standby_mode = 'on'\n")
76-
buffer.WriteString(fmt.Sprintf("recovery_target_time = '%s'\n", time.Now().Format("2006-02-01 15:04:05")))
7775
buffer.WriteString("restore_command = 'wal-g wal-fetch %f %p'\n")
7876

7977
return buffer.Bytes()

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.