Spring Cloud服务熔断
Spring Cloud Hystrix
使用- 基于
Spring Aop
自定义服务熔断 Spring Cloud Hystrix
源码分析
Spring Cloud服务熔断
版本信息
Spring Cloud : Hoxton.SR1
Spring Boot : 2.2.2.RELEASE
Spring Cloud Hystrix
使用
熔断策略
THREAD
— it executes on a separate thread and concurrent requests are limited by the number of threads in the thread-poolSEMAPHORE
— it executes on the calling thread and concurrent requests are limited by the semaphore count
- 线程方式 超时,基于 Thread Pool + Future实现
- 信号量方式 限流,基于Semaphore实现
基本使用
添加
pom
依赖1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--hystrix-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>服务启动类添加
@EnableCircuitBreaker
1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* 服务熔断应用
*
* @author FelixFly <chenglinxu@yeah.net>
* @date 2020/2/5
*/
@SpringBootApplication
@EnableCircuitBreaker
public class CircuitBreakerApplication {
public static void main(String[] args) {
SpringApplication.run(CircuitBreakerApplication.class, args);
}
}注解方式演示服务端点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46/**
* 注解演示服务端点
*
* @author FelixFly <chenglinxu@yeah.net>
* @date 2020/2/5
*/
@RestController
@RequestMapping("/annotation")
public class AnnotationDemoController {
@HystrixCommand(fallbackMethod = "fallBack", commandProperties = {
@HystrixProperty(name = "execution.isolation.strategy", value = "THREAD"),
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "50")
})
@GetMapping("/thread/{message}")
public String thread(@PathVariable String message) {
await();
return "Thread:" + message;
}
@HystrixCommand(fallbackMethod = "fallBack", commandProperties = {
@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"),
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "50"),
@HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "1")
})
@GetMapping("/semaphore/{message}")
public String hello(@PathVariable String message) {
await();
return "Semaphore:" + message;
}
public String fallBack(String message) {
return "Fallback:" + message;
}
public void await() {
int waitTime = new Random().nextInt(100);
try {
System.out.printf("线程【%s】执行时间%d毫秒\n", Thread.currentThread().getName(), waitTime);
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
throw new RuntimeException("线程中断");
}
}
}@HystrixCommand
配置属性:https://github.com/Netflix/Hystrix/wiki/Configuration#CommandProperties
配置文件
application.yml
1
2server:
port: 8090
Thread方式访问地址:http://127.0.0.1:8090/annotation/thread/123
会出现两种结果
Thread:123
正常返回结果Fallback:123
超时返回结果
控制台打印
1
2
3
4
5
线程【hystrix-DemoController-6】执行时间59毫秒
线程【hystrix-DemoController-7】执行时间12毫秒
线程【hystrix-DemoController-8】执行时间44毫秒
线程【hystrix-DemoController-9】执行时间22毫秒
线程【hystrix-DemoController-10】执行时间58毫秒从控制台来看,这个切换线程了
Semaphore
访问地址:http://127.0.0.1:8090/annotation/semaphore/123
会出现两种结果
Semaphore:123
正常返回结果Fallback:123
超时返回结果
控制台打印
1
2
3
4
5
6
7
8
线程【http-nio-8090-exec-1】执行时间0毫秒
线程【http-nio-8090-exec-2】执行时间50毫秒
线程【http-nio-8090-exec-3】执行时间53毫秒
线程【http-nio-8090-exec-4】执行时间1毫秒
线程【http-nio-8090-exec-5】执行时间35毫秒
线程【http-nio-8090-exec-6】执行时间91毫秒
线程【http-nio-8090-exec-7】执行时间5毫秒
线程【http-nio-8090-exec-8】执行时间68毫秒从控制台来看,这个没有切换线程
基于Spring Aop
自定义服务熔断
服务熔断策略枚举
1 |
|
服务熔断注解
1 |
|
服务熔断器(采用策略+模板方法模式)
服务熔断器接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27/**
* 服务熔断器
*
* @author FelixFly <chenglinxu@yeah.net>
* @date 2020/2/5
*/
public interface CircuitBreaker {
/**
* 支持的策略
*
* @param strategy 策略
* @return true 支持 false 不支持
*/
boolean isSupport(CircuitBreakerStrategy strategy);
/**
* 执行方法
*
* @param joinPoint 切面执行
* @param method 执行方法
* @param circuitBreakerCommand 注解信息
* @return 执行结果
*/
Object execute(ProceedingJoinPoint joinPoint, Method method, CircuitBreakerCommand circuitBreakerCommand) throws Exception;
}服务熔断器抽象实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73/**
* 服务熔断器抽象实现
*
* @author FelixFly <chenglinxu@yeah.net>
* @date 2020/2/5
*/
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
@Override
public Object execute(ProceedingJoinPoint joinPoint, Method method, CircuitBreakerCommand circuitBreakerCommand)
throws Exception {
// 目标对象
Object target = joinPoint.getTarget();
// 执行方法参数
Object[] args = joinPoint.getArgs();
try {
return internalExecute(method, joinPoint, circuitBreakerCommand);
} catch (TimeoutException exception) {
// 对TimeoutException异常进行回调方法
String fallBack = circuitBreakerCommand.fallBack();
if (StringUtils.isNotBlank(fallBack)) {
return invokeFallBackMethod(method, target, circuitBreakerCommand, args);
}
throw exception;
}
}
/**
* 内部执行方法
*
* @param method 执行方法
* @param joinPoint 切面执行
* @param circuitBreakerCommand 服务熔断注解信息
* @return 执行结果
* @throws TimeoutException 超时异常
*/
public abstract Object internalExecute(Method method, ProceedingJoinPoint joinPoint,
CircuitBreakerCommand circuitBreakerCommand)
throws Exception;
/**
* 执行服务熔断方法
*
* @param method 执行方法
* @param target 目前对象
* @param circuitBreakerCommand 服务熔断注解信息
* @param args 方法参数
* @return 执行结果
*/
protected Object invokeFallBackMethod(Method method, Object target,
CircuitBreakerCommand circuitBreakerCommand, Object[] args) throws Exception {
Method fallBackMethod = fallBackMethod(method, target, circuitBreakerCommand);
return fallBackMethod.invoke(target, args);
}
/**
* 获取回调方法
*
* @param method 执行方法
* @param target 目前对象
* @param circuitBreakerCommand 服务熔断注解信息
* @return 目标方法
* @throws NoSuchMethodException 无此目标方法
*/
private Method fallBackMethod(Method method, Object target,
CircuitBreakerCommand circuitBreakerCommand) throws NoSuchMethodException {
String fallBackMethodName = circuitBreakerCommand.fallBack();
Class<?> beanClass = target.getClass();
return beanClass.getMethod(fallBackMethodName, method.getParameterTypes());
}
}超时熔断器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60/**
* {@link CircuitBreakerStrategy#THREAD} 超时熔断器
*
* @author FelixFly <chenglinxu@yeah.net>
* @date 2020/2/5
*/
@Service
public class ThreadCircuitBreaker extends AbstractCircuitBreaker {
private ExecutorService executorService = Executors.newFixedThreadPool(2);
@Override
public boolean isSupport(CircuitBreakerStrategy strategy) {
return CircuitBreakerStrategy.THREAD.equals(strategy);
}
@Override
public Object internalExecute(Method method, ProceedingJoinPoint joinPoint,
CircuitBreakerCommand circuitBreakerCommand) throws Exception {
// 执行方法参数
Object[] args = joinPoint.getArgs();
// 进行异步调用
Future<Object> future = executorService.submit(() -> {
try {
return joinPoint.proceed(args);
} catch (Throwable throwable) {
throw new RuntimeException("调用目标方法错误");
}
});
try {
return future.get(circuitBreakerCommand.timeout(), circuitBreakerCommand.timeUnit());
} catch (TimeoutException timeoutException) {
future.cancel(true);//取消执行
throw timeoutException;
}
// 使用CompletableFuture进行异步调用,超时异常没有办法取消调用
/*CompletableFuture<Object> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
return joinPoint.proceed(args);
} catch (Throwable throwable) {
throw new RuntimeException("调用目标方法错误");
}
});
try {
return completableFuture.get(circuitBreakerCommand.timeout(), circuitBreakerCommand.timeUnit());
} catch (TimeoutException timeoutException) {
// this value has no effect in this implementation because interrupts are not used to control processing.
completableFuture.cancel(true);//取消执行,无法进行取消
throw timeoutException;
}*/
}
@PreDestroy
public void destroy(){
executorService.shutdown();
}
}限流熔断器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38/**
* {@link CircuitBreakerStrategy#SEMAPHORE} 限流熔断器
*
* @author FelixFly <chenglinxu@yeah.net>
* @date 2020/2/5
*/
@Service
public class SemaphoreCircuitBreaker extends AbstractCircuitBreaker {
private Map<Method, Semaphore> methodSemaphoreMap = new ConcurrentHashMap<>(128);
@Override
public boolean isSupport(CircuitBreakerStrategy strategy) {
return CircuitBreakerStrategy.SEMAPHORE.equals(strategy);
}
@Override
public Object internalExecute(Method method, ProceedingJoinPoint joinPoint,
CircuitBreakerCommand circuitBreakerCommand) throws Exception {
Semaphore semaphore = initSemaphore(method, circuitBreakerCommand.semaphore());
boolean tryAcquire = semaphore.tryAcquire(circuitBreakerCommand.timeout(), circuitBreakerCommand.timeUnit());
if (!tryAcquire) {
throw new TimeoutException("获取信号量超时");
}
try {
return joinPoint.proceed(joinPoint.getArgs());
} catch (Throwable throwable) {
throw new RuntimeException("调用目标方法错误");
} finally {
semaphore.release();
}
}
private Semaphore initSemaphore(Method method, int semaphore) {
return methodSemaphoreMap.computeIfAbsent(method,
value -> new Semaphore(semaphore));
}
}
服务熔断切面
1 |
|
自定义服务端点
1 |
|
Thread方式访问地址:http://127.0.0.1:8090/custom/thread/123
会出现两种结果
Thread:123
正常返回结果Fallback:123
超时返回结果
Semaphore
访问地址:http://127.0.0.1:8090/custom/semaphore/123
会出现两种结果
Semaphore:123
正常返回结果Fallback:123
超时返回结果
Spring Cloud Hystrix
源码分析
@EnableCircuitBreaker
分析
1 |
|
导入了
EnableCircuitBreakerImportSelector
选择器1
2
3
4
5
6
7
8
9
10
11@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
extends SpringFactoryImportSelector<EnableCircuitBreaker> {
@Override
protected boolean isEnabled() {
return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
Boolean.class, Boolean.TRUE);
}
}父类
SpringFactoryImportSelector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76public abstract class SpringFactoryImportSelector<T>
implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {
private final Log log = LogFactory.getLog(SpringFactoryImportSelector.class);
private ClassLoader beanClassLoader;
private Class<T> annotationClass;
private Environment environment;
@SuppressWarnings("unchecked")
protected SpringFactoryImportSelector() {
this.annotationClass = (Class<T>) GenericTypeResolver
.resolveTypeArgument(this.getClass(), SpringFactoryImportSelector.class);
}
@Override
public String[] selectImports(AnnotationMetadata metadata) {
if (!isEnabled()) {
return new String[0];
}
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(this.annotationClass.getName(), true));
Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
+ metadata.getClassName() + " annotated with @" + getSimpleName() + "?");
// Find all possible auto configuration classes, filtering duplicates
List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
if (factories.isEmpty() && !hasDefaultFactory()) {
throw new IllegalStateException("Annotation @" + getSimpleName()
+ " found, but there are no implementations. Did you forget to include a starter?");
}
if (factories.size() > 1) {
// there should only ever be one DiscoveryClient, but there might be more than
// one factory
this.log.warn("More than one implementation " + "of @" + getSimpleName()
+ " (now relying on @Conditionals to pick one): " + factories);
}
return factories.toArray(new String[factories.size()]);
}
protected boolean hasDefaultFactory() {
return false;
}
protected abstract boolean isEnabled();
protected String getSimpleName() {
return this.annotationClass.getSimpleName();
}
protected Class<T> getAnnotationClass() {
return this.annotationClass;
}
protected Environment getEnvironment() {
return this.environment;
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.beanClassLoader = classLoader;
}
}通过
SpringFactoriesLoader
加载了EnableCircuitBreaker
的配置类查找这个
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker
配置类发现spring-cloud-netflix-hystrix
包下META-INF/spring.factories
有如下配置1
2org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
HystrixCircuitBreakerConfiguration
Hystrix
服务熔断配置
1 |
|
HystrixCommandAspect
HystrixCommand
切面
1 |
|
从代码来看切面拦截了两个注解,@HystrixCommand
和@HystrixCollapser
,前面已经熟悉过@HystrixCommand
,这个@HystrixCollapser
又是什么内容
@HystrixCollapser
注解
1 |
|
从@HystrixCollapser
这个代码注释来看是针对Future
返回值来做的,理解来看是多个@HystrixCommand
组合注解
Spring Cloud 官方文档没有对
@HystrixCollapser
注解类的介绍
1 |
|
@HystrixCollapser
配置信息:https://github.com/Netflix/Hystrix/wiki/Configuration#Collapser
访问地址:http://127.0.0.1:8090/annotation/collapser/123
1 |
|
暂时还没发现具体的用处,等发现的时候来继续补充