Hystrix:分布式系统的高性能及高可用解决方案(一)

Hystrix在业界广泛使用,是一把解决分布式系统间服务依赖的性能及可用性问题的利器。本系列文章主要整理Hystrix有哪些功能。

废话少说,现在一步步来揭开它神秘的面纱吧!

Hello world

这是HystrixCommand的一个Hello world实现:

public class CommandHelloWorld extends HystrixCommand<String> {

    private final String name;

    public CommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        //在实际应用中这里为调用服务
        return "Hello " + name + "!";
    }
}

我们也可以使用HystrixObservableCommand来实现相同的功能:

public class CommandHelloWorld extends HystrixObservableCommand<String> {

    private final String name;

    public CommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected Observable<String> construct() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> observer) {
                try {
                    if (!observer.isUnsubscribed()) {
                        // 在实际应用中这里为调用服务
                        observer.onNext("Hello");
                        observer.onNext(name + "!");
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
         } ).subscribeOn(Schedulers.io());
    }
}

同步执行

我们可以同步执行一个HystrixCommand,如下所示:

String s = new CommandHelloWorld("World").execute();

其执行结果可以通过如下测试case来体现:

@Test
public void testSynchronous() {
    assertEquals("Hello World!", new CommandHelloWorld("World").execute());
    assertEquals("Hello Bob!", new CommandHelloWorld("Bob").execute());
}

使用HystrixObservableCommand的话会稍微麻烦一点点,这个command产生的Observable只会产生一个值,我们可以通过对Observable调用.toBlocking().toFuture().get()来获得相同的效果。

异步执行

我们可以通过queue()方法来异步执行一个HystrixCommand:

Future<String> fs = new CommandHelloWorld("World").queue();

我们可以通过这个Future来获取这个command的结果:

String s = fs.get();

下面的单元测试描述了调用的行为结果:

@Test
public void testAsynchronous1() throws Exception {
    assertEquals("Hello World!", new CommandHelloWorld("World").queue().get());
    assertEquals("Hello Bob!", new CommandHelloWorld("Bob").queue().get());
}

@Test
public void testAsynchronous2() throws Exception {

    Future<String> fWorld = new CommandHelloWorld("World").queue();
    Future<String> fBob = new CommandHelloWorld("Bob").queue();

    assertEquals("Hello World!", fWorld.get());
    assertEquals("Hello Bob!", fBob.get());
}

下面的调用的结果是等价的:

String s1 = new CommandHelloWorld("World").execute();
String s2 = new CommandHelloWorld("World").queue().get();

对于HystrixObservableCommand,我们可以通过对其产生的Observable调用.toBlocking().toFuture()来获取Future。

响应式执行

我们可以通过如下方式来将HystrixCommand的结果转化成Observable对象,并使用观察者模式观察其结果:

  • observe():并且立即执行command并且返回Observable对象
  • toObservable():返回Observable对象,但只有订阅这个对象时才会执行command(懒加载)

使用方法如下:

Observable<String> ho = new CommandHelloWorld("World").observe();
// or Observable<String> co = new CommandHelloWorld("World").toObservable();

我们可以通过如下方式来获取command的结果:

ho.subscribe(new Action1<String>() {

    @Override
    public void call(String s) {
         // 在这里处理command的值
    }

});

下面的单元测试进一步阐述这个行为:

@Test
public void testObservable() throws Exception {

    Observable<String> fWorld = new CommandHelloWorld("World").observe();
    Observable<String> fBob = new CommandHelloWorld("Bob").observe();

    // 阻塞调用
    assertEquals("Hello World!", fWorld.toBlockingObservable().single());
    assertEquals("Hello Bob!", fBob.toBlockingObservable().single());

    // 非阻塞调用
    // - 匿名内部类
    fWorld.subscribe(new Observer<String>() {

        @Override
        public void onCompleted() {
            // 这里在本例中不需要处理
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onNext(String v) {
            System.out.println("onNext: " + v);
        }

    });

    // 非阻塞
    // - 另一个匿名内部类
    // - 忽略errors和onCompleted的通知
    fBob.subscribe(new Action1<String>() {

        @Override
        public void call(String v) {
            System.out.println("onNext: " + v);
        }

    });
}

使用Java8的lambda/闭包可以写的更加紧凑:

fWorld.subscribe((v) -> {
    System.out.println("onNext: " + v);
})

// - 包含异常处理

fWorld.subscribe((v) -> {
    System.out.println("onNext: " + v);
}, (exception) -> {
    exception.printStackTrace();
})

关于Observable的更多内容可以参考http://reactivex.io/documentation/observable.html

响应式的command

除了可以将HystrixCommand转化成Observable之外,我们也可以直接创建一个HystrixObservableCommand。HystrixObservableCommand可以包装一个Observable,而这个Observable可以产生多个元素;相比较之下,HystrixCommand转化得到的Observable最多只能产生一个元素。

在创建HystrixObservableCommand时,我们需要重写construct方法以返回一个Observable(HystrixCommand则需要重写run方法)。为了获取HystrixObservableCommand的Observable视图,我们可以使用如下任意一个方法:

  • observe():并且立即执行command并且返回Observable对象
  • toObservable():返回Observable对象,但只有订阅这个对象时才会执行command(懒加载)

降级

我们在使用Hystrix的command时可以添加一个fallback方法,这个fallback方法用于在主流程异常时提供一个默认的值。但对于某些command来说,主流程异常时我们不希望使用fallback:

  1. 执行写操作的command:如果我们的command是用来执行写操作而不是读操作的话(写操作的command通常返回void),实现fallback方法没有什么意义。如果写失败,我们更希望通知调用者这个失败消息。
  2. 执行批处理或者离线处理的command:如果我们的command是执行填充缓存、产生报表或者其他离线运算的话,与其返回降级的结果值,不如通知调用者失败消息,这样的话调用者可以执行重试操作。

无论是否实现fallback方法,当command执行失败时,Hystrix以及熔断器都会更新其状态。

对于HystrixCommand来说,我们只需要实现getFallback()方法即可;当出现异常时(run()异常、超时、线程池或者信号量拒绝、熔断器短路等等),Hystrix会执行这个方法。例子如下:

public class CommandHelloFailure extends HystrixCommand<String> {

    private final String name;

    public CommandHelloFailure(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        throw new RuntimeException("this command always fails");
    }

    @Override
    protected String getFallback() {
        return "Hello Failure " + name + "!";
    }
}

上面的command执行run方法时会失败,但是调用者会接收到getFallback()方法的返回值而不是接收到一个异常:

@Test
public void testSynchronous() {
    assertEquals("Hello Failure World!", new CommandHelloFailure("World").execute());
    assertEquals("Hello Failure Bob!", new CommandHelloFailure("Bob").execute());
}

HystrixObservableCommand的fallback

对于HystrixObservableCommand来说,我们需要重写resumeWithFallback方法,这个方法在主流程异常时返回另一个Observable以替代主流程的Observable。需要注意的是,一个Observable可能在产生了一个或多个元素后才失败,因此观察者(observer)可能既看到主流程Observable产生的元素,也看到这个降级的Observable产生的元素。

在内部实现中,如果发生异常,Hystrix使用RxJava的onErrorResumeNext来从主流程无缝降级到fallback流程。

异常传播

run()方法的所有异常(除了HystrixBadRequestException)都会触发getFallback()以及熔断器逻辑。

我们可以使用HystrixBadRequestException来封装想抛出的异常,并通过getCause()方法来获取。HystrixBadRequestException是用来反馈参数错误或者非系统错误等异常,这些异常不应该当做失败而且不应该触发熔断器逻辑。

对于HystrixObservableCommand来说,Observable通过onError方法来通知不可恢复的异常,并通过resumeWithFallback方法来降级到另一个Observable。

异常类型

错误类型 异常类 异常源 降级
FAILURE HystrixRuntimeException 用户自定义异常
TIMEOUT HystrixRuntimeException j.u.c.TimeoutException
SHORT_CIRCUITED HystrixRuntimeException j.l.RuntimeException
THREAD_POOL_REJECTED HystrixRuntimeException j.u.c.RejectedExecutionException 是)
SEMAPHORE_REJECTED HystrixRuntimeException j.l.RuntimeException
BAD_REQUEST HystrixBadRequestException 用户自定义

command的名称

command名称默认为类名称:

 

getClass().getSimpleName();

我们也可以在构造方法中显式定义command名称:

public CommandHelloWorld(String name) {
    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
            .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")));
    this.name = name;
}

如果想减少Setter分配,我们也可以缓存Setter:

private static final Setter cachedSetter = 
    Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
        .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"));    

public CommandHelloWorld(String name) {
    super(cachedSetter);
    this.name = name;
}

HystrixCommandKey是一个接口类型,可以实现为enum类型或者class类型,但它也提供了factory来构造一个内部实例:

HystrixCommandKey.Factory.asKey("HelloWorld")

command的组

Hystrix使用来区分不同类型的command,例如上报组,告警组,仪表盘组等等。默认情况下,Hystrix使用组来定义command的线程池,除非已经显式定义另外一个了。

HystrixCommandGroupKey是一个接口类型,

HystrixCommandGroupKey是一个接口类型,可以实现为enum类型或者class类型,但它也提供了factory来构造一个内部实例:

HystrixCommandGroupKey.Factory.asKey("ExampleGroup")

command的线程池

一个HystrixCommand与一个HystrixThreadPool关联,这个HystrixThreadPool可以通过HystrixThreadPoolKey获取,默认情况下通过HystrixCommandGroupKey来获取。

可以通过HystrixCommand或者HystrixObservableCommand的构造方法来显式命名:

public CommandHelloWorld(String name) {
    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
            .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
            .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
    this.name = name;
}

HystrixThreadPoolKey是一个接口类型,可以实现为enum类型或者class类型,但它也提供了factory来构造一个内部实例:

HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")

将HystrixThreadPoolKey与HystrixCommandGroupKey分离的原因是,多个command可能在逻辑功能上属于同一个组,但是其中某些command可能需要隔离。

举个例子,假设有两个command需要访问视频资源,组名称为“Video”,command A访问资源#1,command B访问资源#2;如果command A有延迟并且使用了线程池的全部线程,它不应该对B产生影响,因为A和B访问不同的后端资源。因此,我们希望将这些命令归组但物理上相互隔离,每个command使用单独一个HystrixThreadPoolKey。

Written on August 30, 2017