Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

bunnier/circuit

Open more actions menu

Repository files navigation

circuit

Go

简介

本项目提供处理服务熔断限流等逻辑的工具。

文档

GO Docs:https://pkg.go.dev/github.com/bunnier/circuit

本包主要通过 Command 对象交互,使用时通过circuit.NewCommand() 函数创建 Command对象,之后通过 Command.Execute() 方法可在熔断器上执行初始时传入的功能函数。

初始化 Command 对象时,可通过选项函数 circuit.WithCommandBreaker() 传入特定熔断器,包中目前内置了如下两个熔断器供选择:

  • CutBreaker(默认):提供了常规的断路器模式的熔断器。类似hystrix的算法,维护 OpenHalf-OpenClosed 3个状态,错误率达到阈值后OpenOpen后休眠指定时间转变为Half-Open,之后允许一个请求探测,如恢复正常,则Closed,反之重新进入Open
  • SreBreaker:实现了Google SRE提出的 adaptive throttling 自适应熔断器,算法介绍参考:https://sre.google/sre-book/handling-overload/#eq2101

DEMO

package main

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"

	"github.com/bunnier/circuit"
)

/**
* 功能函数签名:CommandFunc,该类型注释有详细说明。
* 降级函数(可选)签名:CommandFallbackFunc,该类型注释有详细说明。
* 更多用法也可参考command_test.go中的测试用例。
 */
func main() {
	// 功能函数,简单的通过参数true/false来控制成功失败。
	run := func(ctx context.Context, param interface{}) (interface{}, error) {
		if success := param.(bool); !success {
			return nil, errors.New("error")
		}
		return "ok", nil
	}

	// 降级函数(可选)。
	fallback := func(ctx context.Context, param interface{}, e error) (interface{}, error) {
		return "fallback", nil
	}

	// 初始化Command。
	// 默认参数5s内20次以上,50%失败率后开启熔断器。
	command := circuit.NewCommand(
		"test", run,
		// 默认为CutBreaker,这里可通过选项函数切换为SreBreaker,后面的例子使用CutBreaker,所以下面这行注释掉。
		// circuit.WithCommandBreaker(breaker.NewSreBreaker("test")),
		circuit.WithCommandFallback(fallback),
		circuit.WithCommandTimeout(time.Second*5))

	defer command.Close() // 主要用于释放command中开启的统计goroutine。

	var wg sync.WaitGroup

	// 模拟20次请求,10个成功,10个失败,让其刚好到临界。
	wg.Add(20)
	for i := 0; i < 20; i++ {
		go func(res bool) {
			command.Execute(res)
			wg.Done()
		}(i%2 == 0)
	}
	wg.Wait()

	// 窗口期内再来一个错误请求,开启熔断器。
	res, _ := command.Execute(false)
	fmt.Printf("step1: %s\n", res) // fallback。

	// 开启熔断器后再模拟10并发个请求,都会直接走降级函数。
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func() {
			res, _ = command.Execute(true)
			fmt.Printf("step2: %s\n", res) // fallback。
			wg.Done()
		}()
	}
	wg.Wait()

	// 熔断器开启5s后进入半开状态。
	time.Sleep(5 * time.Second)

	// 默认使用“一刀切”的恢复算法,半开状态下,只能有一个请求进入尝试,通过就重置统计,不通过重新完全开启熔断器。
	// 这里模拟一个不通过的请求,将重新开启熔断器。
	_, _ = command.Execute(false)
	res, _ = command.Execute(true)
	fmt.Printf("step3: %s\n", res) // fallback。

	// 再次休息5s,再次进入半开状态。
	time.Sleep(5 * time.Second)

	// 半开状态时候请求成功,将重置统计。
	_, _ = command.Execute(true)

	// 重置后模拟10个并发成功请求,都被会执行~
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			res, _ = command.Execute(true)
			fmt.Printf("step4: %s\n", res) // ok
			wg.Done()
		}()
	}
	wg.Wait()
}

下一步

  • 支持计数器限流;
  • 提供订阅状态变化的hook;
  • 提供状态观察接口;

About

本项目提供处理服务熔断、限流、超时等逻辑的工具。

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages

Morty Proxy This is a proxified and sanitized view of the page, visit original site.