Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 1490221

Browse filesBrowse files
authored
Support client-side specification of consistency for DescribeWorkflowExecution and GetWorkflowExecutionHistory (#6789)
* feat(frontend): Support client requested consistency levels Some clients require strong consistency for reads on the DescribeWorkflowExecution and GetWorkflowExecutionHistory rpcs, and are willing to accept the latency trade-off. This adds internal support for endpoints to define a consistency level. * Add tests and remove QueryWorkflowStrongConsistency from configuration * make pr
1 parent 281693e commit 1490221
Copy full SHA for 1490221

File tree

Expand file treeCollapse file tree

7 files changed

+533
-316
lines changed
Filter options
Expand file treeCollapse file tree

7 files changed

+533
-316
lines changed

‎common/types/shared.go

Copy file name to clipboardExpand all lines: common/types/shared.go
+20-2Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,8 +1808,9 @@ func (v *DescribeTaskListResponse) GetTaskListStatus() (o *TaskListStatus) {
18081808

18091809
// DescribeWorkflowExecutionRequest is an internal type (TBD...)
18101810
type DescribeWorkflowExecutionRequest struct {
1811-
Domain string `json:"domain,omitempty"`
1812-
Execution *WorkflowExecution `json:"execution,omitempty"`
1811+
Domain string `json:"domain,omitempty"`
1812+
Execution *WorkflowExecution `json:"execution,omitempty"`
1813+
QueryConsistencyLevel *QueryConsistencyLevel `json:"queryConsistencyLevel,omitempty"`
18131814
}
18141815

18151816
// GetDomain is an internal getter (TBD...)
@@ -1828,6 +1829,14 @@ func (v *DescribeWorkflowExecutionRequest) GetExecution() (o *WorkflowExecution)
18281829
return
18291830
}
18301831

1832+
// GetQueryConsistencyLevel is an internal getter (TBD...)
1833+
func (v *DescribeWorkflowExecutionRequest) GetQueryConsistencyLevel() (o QueryConsistencyLevel) {
1834+
if v != nil && v.QueryConsistencyLevel != nil {
1835+
return *v.QueryConsistencyLevel
1836+
}
1837+
return
1838+
}
1839+
18311840
// DescribeWorkflowExecutionResponse is an internal type (TBD...)
18321841
type DescribeWorkflowExecutionResponse struct {
18331842
ExecutionConfiguration *WorkflowExecutionConfiguration `json:"executionConfiguration,omitempty"`
@@ -2599,6 +2608,7 @@ type GetWorkflowExecutionHistoryRequest struct {
25992608
WaitForNewEvent bool `json:"waitForNewEvent,omitempty"`
26002609
HistoryEventFilterType *HistoryEventFilterType `json:"HistoryEventFilterType,omitempty"`
26012610
SkipArchival bool `json:"skipArchival,omitempty"`
2611+
QueryConsistencyLevel *QueryConsistencyLevel `json:"queryConsistencyLevel,omitempty"`
26022612
}
26032613

26042614
// GetDomain is an internal getter (TBD...)
@@ -2657,6 +2667,14 @@ func (v *GetWorkflowExecutionHistoryRequest) GetSkipArchival() (o bool) {
26572667
return
26582668
}
26592669

2670+
// GetQueryConsistencyLevel is an internal getter (TBD...)
2671+
func (v *GetWorkflowExecutionHistoryRequest) GetQueryConsistencyLevel() (o QueryConsistencyLevel) {
2672+
if v != nil && v.QueryConsistencyLevel != nil {
2673+
return *v.QueryConsistencyLevel
2674+
}
2675+
return
2676+
}
2677+
26602678
// GetWorkflowExecutionHistoryResponse is an internal type (TBD...)
26612679
type GetWorkflowExecutionHistoryResponse struct {
26622680
History *History `json:"history,omitempty"`

‎service/frontend/templates/clusterredirection.tmpl

Copy file name to clipboardExpand all lines: service/frontend/templates/clusterredirection.tmpl
+18-42Lines changed: 18 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ import (
1212
frontendcfg "github.com/uber/cadence/service/frontend/config"
1313
)
1414

15-
{{$nonFowradingAPIs := list "Health" "DeprecateDomain" "DescribeDomain" "ListDomains" "RegisterDomain" "UpdateDomain" "GetSearchAttributes" "GetClusterInfo" "DiagnoseWorkflowExecution"}}
15+
{{$nonForwardingAPIs := list "Health" "DeprecateDomain" "DescribeDomain" "ListDomains" "RegisterDomain" "UpdateDomain" "GetSearchAttributes" "GetClusterInfo" "DiagnoseWorkflowExecution"}}
1616
{{$domainIDAPIs := list "RecordActivityTaskHeartbeat" "RespondActivityTaskCanceled" "RespondActivityTaskCompleted" "RespondActivityTaskFailed" "RespondDecisionTaskCompleted" "RespondDecisionTaskFailed" "RespondQueryTaskCompleted"}}
1717
{{$queryTaskTokenAPIs := list "RespondQueryTaskCompleted"}}
18-
{{$specialCaseAPIs := list "QueryWorkflow"}}
18+
{{$readAPIsWithStrongConsistency := list "QueryWorkflow" "DescribeWorkflowExecution" "GetWorkflowExecutionHistory"}}
1919

2020
type (
2121
// ClusterRedirectionHandlerImpl is simple wrapper over frontend service, doing redirection based on policy for global domains not being active in current cluster
@@ -55,13 +55,24 @@ func NewAPIHandler(
5555
}
5656

5757
{{range $method := .Interface.Methods}}
58-
{{- if not (has $method.Name $specialCaseAPIs)}}
5958
func (handler *clusterRedirectionHandler) {{$method.Declaration}} {
60-
{{- if has $method.Name $nonFowradingAPIs}}
59+
{{- if has $method.Name $nonForwardingAPIs}}
6160
return handler.frontendHandler.{{$method.Call}}
6261
{{- else}}
63-
var apiName = "{{$method.Name}}"
64-
var cluster string
62+
var (
63+
apiName = "{{$method.Name}}"
64+
cluster string
65+
requestedConsistencyLevel types.QueryConsistencyLevel = types.QueryConsistencyLevelEventual
66+
)
67+
68+
{{- if has $method.Name $readAPIsWithStrongConsistency}}
69+
// Only autoforward strong consistent queries, this is done for two reasons:
70+
// 1. Query is meant to be fast, autoforwarding all queries will increase latency.
71+
// 2. If eventual consistency was requested then the results from running out of local dc will be fine.
72+
if {{(index $method.Params 1).Name}}.GetQueryConsistencyLevel() == types.QueryConsistencyLevelStrong {
73+
requestedConsistencyLevel = types.QueryConsistencyLevelStrong
74+
}
75+
{{- end}}
6576

6677
{{$policyMethod := "WithDomainNameRedirect"}}
6778
{{$domain := printf "%s.GetDomain()" (index $method.Params 1).Name}}
@@ -94,7 +105,7 @@ func (handler *clusterRedirectionHandler) {{$method.Declaration}} {
94105
}
95106
{{- end}}
96107

97-
err = handler.redirectionPolicy.{{$policyMethod}}(ctx, {{$domain}}, apiName, func(targetDC string) error {
108+
err = handler.redirectionPolicy.{{$policyMethod}}(ctx, {{$domain}}, apiName, requestedConsistencyLevel, func(targetDC string) error {
98109
cluster = targetDC
99110
switch {
100111
case targetDC == handler.currentClusterName:
@@ -110,38 +121,3 @@ func (handler *clusterRedirectionHandler) {{$method.Declaration}} {
110121
{{- end}}
111122
}
112123
{{end}}
113-
{{end}}
114-
115-
func (handler *clusterRedirectionHandler) QueryWorkflow(
116-
ctx context.Context,
117-
request *types.QueryWorkflowRequest,
118-
) (resp *types.QueryWorkflowResponse, retError error) {
119-
var apiName = "QueryWorkflow"
120-
var err error
121-
var cluster string
122-
123-
// Only autoforward strong consistent queries, this is done for two reasons:
124-
// 1. Query is meant to be fast, autoforwarding all queries will increase latency.
125-
// 2. If eventual consistency was requested then the results from running out of local dc will be fine.
126-
if request.GetQueryConsistencyLevel() == types.QueryConsistencyLevelStrong {
127-
apiName = "QueryWorkflowStrongConsistency"
128-
}
129-
scope, startTime := handler.beforeCall(metrics.DCRedirectionQueryWorkflowScope)
130-
defer func() {
131-
handler.afterCall(recover(), scope, startTime, request.GetDomain(), "", cluster, &retError)
132-
}()
133-
134-
err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
135-
cluster = targetDC
136-
switch {
137-
case targetDC == handler.currentClusterName:
138-
resp, err = handler.frontendHandler.QueryWorkflow(ctx, request)
139-
default:
140-
remoteClient := handler.GetRemoteFrontendClient(targetDC)
141-
resp, err = remoteClient.QueryWorkflow(ctx, request, handler.callOptions...)
142-
}
143-
return err
144-
})
145-
146-
return resp, err
147-
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.