Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Move cadence server's app to common #6909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
Loading
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 4 additions & 101 deletions 105 cmd/server/cadence/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,17 @@
package cadence

import (
"context"
"errors"
"fmt"
stdLog "log"
"os"
"path/filepath"
"strings"
"sync"

"github.com/urfave/cli/v2"
"go.uber.org/fx"
"go.uber.org/multierr"

"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/cadence"

_ "go.uber.org/automaxprocs" // defines automaxpocs for dockerized usage.
)
Expand Down Expand Up @@ -117,7 +112,7 @@ func BuildCLI(releaseVersion string, gitRevision string) *cli.App {
return fmt.Errorf("get hostname: %w", err)
}

appCtx := appContext{
appCtx := cadence.AppContext{
CfgContext: config.Context{
Environment: getEnvironment(c),
Zone: getZone(c),
Expand All @@ -129,91 +124,14 @@ func BuildCLI(releaseVersion string, gitRevision string) *cli.App {

services := getServices(c)

return runServices(
services,
func(serviceName string) fxAppInterface {
return fx.New(
fx.Module(serviceName,
_commonModule,
fx.Provide(
func() appContext {
return appCtx
},
),
Module(serviceName),
),
)
},
)
return cadence.Run(services, appCtx)
},
},
}

return app
}

func runServices(services []string, appBuilder func(serviceName string) fxAppInterface) error {
stoppedWg := &sync.WaitGroup{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errChan := make(chan error, len(services))

for _, serv := range services {
stoppedWg.Add(1)
go func(s string) {
defer stoppedWg.Done()
fxApp := appBuilder(s)

// If any of the start hooks return an error, Start short-circuits, calls Stop, and returns the inciting error.
if err := fxApp.Start(ctx); err != nil {
// If any of the apps fails to start, immediately cancel the context so others will also stop.
cancel()
errChan <- fmt.Errorf("service %s start: %w", s, err)
return
}

select {
// Block until FX receives a shutdown signal
case <-fxApp.Done():
}

// Stop the application
err := fxApp.Stop(ctx)
if err != nil {
errChan <- fmt.Errorf("service %s stop: %w", s, err)
}
}(serv)
}
go func() {
stoppedWg.Wait()
// After stoppedWg unblocked all services are stopped to we no longer wait for errors.
close(errChan)
}()

var resErrors error
for err := range errChan {
// skip canceled errors, since they are caused by context cancelation and only focus on actual errors.
if err != nil && !errors.Is(err, context.Canceled) {
resErrors = multierr.Append(resErrors, err)
}
}
if resErrors != nil {
return resErrors
}
return nil
}

type appContext struct {
fx.Out

CfgContext config.Context
ConfigDir string `name:"config-dir"`
RootDir string `name:"root-dir"`
HostName string `name:"hostname"`
}

func getEnvironment(c *cli.Context) string {
return strings.TrimSpace(c.String("env"))
}
Expand Down Expand Up @@ -242,7 +160,7 @@ func getServices(c *cli.Context) []string {
}

func getConfigDir(c *cli.Context) string {
return constructPathIfNeed(getRootDir(c), c.String("config"))
return cadence.ConstructPathIfNeed(getRootDir(c), c.String("config"))
}

func getRootDir(c *cli.Context) string {
Expand All @@ -256,18 +174,3 @@ func getRootDir(c *cli.Context) string {
}
return dirpath
}

// constructPathIfNeed would append the dir as the root dir
// when the file wasn't absolute path.
func constructPathIfNeed(dir string, file string) string {
if !filepath.IsAbs(file) {
return dir + "/" + file
}
return file
}

type fxAppInterface interface {
Start(context.Context) error
Stop(context.Context) error
Done() <-chan os.Signal
}
224 changes: 4 additions & 220 deletions 224 cmd/server/cadence/cadence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,12 @@
package cadence

import (
"context"
"errors"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/fx"
"go.uber.org/fx/fxtest"

"github.com/uber/cadence/common/service/cadence"
)

type CadenceSuite struct {
Expand All @@ -59,216 +53,6 @@ func (s *CadenceSuite) TestIsValidService() {
}

func (s *CadenceSuite) TestPath() {
s.Equal("foo/bar", constructPathIfNeed("foo", "bar"))
s.Equal("/bar", constructPathIfNeed("foo", "/bar"))
}

// MockFxApp implements fxAppInterface for testing
type MockFxApp struct {
StartFunc func(context.Context) error
StopFunc func(context.Context) error
DoneFunc func() <-chan os.Signal
}

func (m *MockFxApp) Start(ctx context.Context) error {
return m.StartFunc(ctx)
}

func (m *MockFxApp) Stop(ctx context.Context) error {
return m.StopFunc(ctx)
}

func (m *MockFxApp) Done() <-chan os.Signal {
return m.DoneFunc()
}

// TestRunServicesSuccess tests successful service execution
func TestRunServicesSuccess(t *testing.T) {
// Create a test application using fxtest
app := fxtest.New(t,
fx.Provide(func() string { return "test-service" }),
fx.Invoke(func(s string) {
// Just a simple component that does nothing
}),
)

// Create a done channel
done := make(chan os.Signal, 1)

// Wrap fxtest.App in our interface
appInterface := &MockFxApp{
StartFunc: app.Start,
StopFunc: app.Stop,
DoneFunc: func() <-chan os.Signal { return done },
}

// Run in a goroutine
errCh := make(chan error, 1)
go func() {
errCh <- runServices([]string{"service1"}, func(name string) fxAppInterface {
return appInterface
})
}()

// Give it a moment to start
time.Sleep(50 * time.Millisecond)

// Send signal to stop
close(done)

// Check result
err := <-errCh
assert.NoError(t, err)
}

// TestRunServicesStartError tests failure during service start
func TestRunServicesStartError(t *testing.T) {
// Create a mock app that fails on start
startError := errors.New("failed to start")
app := &MockFxApp{
StartFunc: func(ctx context.Context) error {
return startError
},
StopFunc: func(ctx context.Context) error {
return nil
},
DoneFunc: func() <-chan os.Signal {
ch := make(chan os.Signal)
return ch
},
}

// Run the services
err := runServices([]string{"service1"}, func(name string) fxAppInterface {
return app
})

// Verify error was returned
assert.Error(t, err)
assert.True(t, errors.Is(err, startError), "Error chain should contain the original error")
}

// TestRunServicesStopError tests failure during service stop
func TestRunServicesStopError(t *testing.T) {
// Create a mock app that fails on stop
stopError := errors.New("failed to stop")
done := make(chan os.Signal, 1)
app := &MockFxApp{
StartFunc: func(ctx context.Context) error {
return nil
},
StopFunc: func(ctx context.Context) error {
return stopError
},
DoneFunc: func() <-chan os.Signal {
return done
},
}

// Run in a goroutine
errCh := make(chan error, 1)
go func() {
errCh <- runServices([]string{"service1"}, func(name string) fxAppInterface {
return app
})
}()

// Give it a moment to start
time.Sleep(50 * time.Millisecond)

// Signal that the service is done
close(done)

// Check the error
err := <-errCh
assert.Error(t, err)
assert.True(t, errors.Is(err, stopError), "Error chain should contain the stop error")
}

// TestRunServicesCascadeFailure tests that when one service fails, others get stopped
func TestRunServicesCascadeFailure(t *testing.T) {
// We'll use this to track context cancellation
var contextCancelled bool
var contextMu sync.Mutex

// Create two apps - one will fail, one will succeed but should be stopped
startErr := errors.New("service 2 failed to start")

// Track app lifecycle events
app1Started := false
app1Stopped := false
app2Started := false

// First app - will start successfully
done1 := make(chan os.Signal, 1)
app1 := &MockFxApp{
StartFunc: func(ctx context.Context) error {
app1Started = true

// Monitor for context cancellation in a goroutine
go func() {
<-ctx.Done()
contextMu.Lock()
contextCancelled = true
contextMu.Unlock()
// Close done channel to simulate app stopping due to context
close(done1)
}()

return nil
},
StopFunc: func(ctx context.Context) error {
app1Stopped = true
return nil
},
DoneFunc: func() <-chan os.Signal {
return done1
},
}

// Second app - will fail to start
app2 := &MockFxApp{
StartFunc: func(ctx context.Context) error {
app2Started = true
return startErr
},
StopFunc: func(ctx context.Context) error {
t.Fatal("App2 Stop should never be called since Start failed")
return nil
},
DoneFunc: func() <-chan os.Signal {
ch := make(chan os.Signal)
return ch // Never signals
},
}

// Build app provider that returns different apps for different services
appProvider := func(name string) fxAppInterface {
switch name {
case "service1":
return app1
case "service2":
return app2
default:
t.Fatalf("Unexpected service name: %s", name)
return nil
}
}

// Run services
err := runServices([]string{"service1", "service2"}, appProvider)

// Verify results
require.Error(t, err, "Should return an error")
assert.True(t, app1Started, "App1 should have started")
assert.True(t, app2Started, "App2 should have started")
assert.True(t, app1Stopped, "App1 should have been stopped due to context cancellation")

// Check if context was cancelled
contextMu.Lock()
assert.True(t, contextCancelled, "Context should have been cancelled")
contextMu.Unlock()

// Check error content
assert.Contains(t, err.Error(), "service 2")
s.Equal("foo/bar", cadence.ConstructPathIfNeed("foo", "bar"))
s.Equal("/bar", cadence.ConstructPathIfNeed("foo", "/bar"))
}
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.