美文网首页
Google-Guava Concurrent 包里的 Serv

Google-Guava Concurrent 包里的 Serv

作者: ProfessorJason | 来源:发表于2019-06-11 11:54 被阅读0次

    概述

    Guava 包里的 Service 接口用于封装一个服务对象的运行状态、包括 start 和 stop 等方法。例如 web 服务器,RPC 服务器、计时器等可以实现这个接口。对此类服务的状态管理并不轻松、需要对服务的开启/关闭进行妥善管理、特别是在多线程环境下尤为复杂。Guava 包提供了一些基础类帮助你管理复杂的状态转换逻辑和同步细节。

    使用一个服务

    一个服务正常生命周期有:

    服务一旦被停止就无法再重新启动了。如果服务在 starting、running、stopping 状态出现问题、会进入 Service.State.FAILED.状态。调用 startAsync()方法可以异步开启一个服务,同时返回 this 对象形成方法调用链。注意:只有在当前服务的状态是 NEW 时才能调用 startAsync()方法,因此最好在应用中有一个统一的地方初始化相关服务。停止一个服务也是类似的、使用异步方法 stopAsync() 。但是不像 startAsync(),多次调用这个方法是安全的。这是为了方便处理关闭服务时候的锁竞争问题。

    Service 也提供了一些方法用于等待服务状态转换的完成:

    通过 addListener()方法异步添加监听器。此方法允许你添加一个 Service.Listener 、它会在每次服务状态转换的时候被调用。注意:最好在服务启动之前添加 Listener(这时的状态是 NEW)、否则之前已发生的状态转换事件是无法在新添加的 Listener上被重新触发的。

    同步使用 awaitRunning()。这个方法不能被打断、不强制捕获异常、一旦服务启动就会返回。如果服务没有成功启动,会抛出 IllegalStateException 异常。同样的, awaitTerminated() 方法会等待服务达到终止状态(TERMINATED 或者 FAILED)。两个方法都有重载方法允许传入超时时间。

    Service 接口本身实现起来会比较复杂、且容易碰到一些捉摸不透的问题。因此我们不推荐直接实现这个接口。而是请继承 Guava 包里已经封装好的基础抽象类。每个基础类支持一种特定的线程模型。

    基础实现类

    AbstractIdleService

    AbstractIdleService 类简单实现了 Service 接口、其在 running 状态时不会执行任何动作–因此在 running 时也不需要启动线程–但需要处理开启/关闭动作。要实现一个此类的服务,只需继承 AbstractIdleService 类,然后自己实现 startUp()shutDown()方法就可以了。

        protected void startUp() {
        servlets.add(new GcStatsServlet());
        }
        protected void shutDown() {}
    

    如上面的例子、由于任何请求到 GcStatsServlet 时已经会有现成线程处理了,所以在服务运行时就不需要做什么额外动作了。

    AbstractExecutionThreadService

    AbstractExecutionThreadService 通过单线程处理启动、运行、和关闭等操作。你必须重载 run()方法,同时需要能响应停止服务的请求。具体的实现可以在一个循环内做处理:

        public void run() {
          while (isRunning()) {
            // perform a unit of work
          }
        }
    

    另外,你还可以重载 triggerShutdown()方法让 run()方法结束返回。

    重载 startUp()和 shutDown()方法是可选的,不影响服务本身状态的管理

        protected void startUp() {
        dispatcher.listenForConnections(port, queue);
         }
         protected void run() {
           Connection connection;
           while ((connection = queue.take() != POISON)) {
         process(connection);
           }
         }
         protected void triggerShutdown() {
           dispatcher.stopListeningForConnections(queue);
           queue.put(POISON);
         }
    

    start()内部会调用 startUp()方法,创建一个线程、然后在线程内调用 run()方法。stop()会调用 triggerShutdown()方法并且等待线程终止。

    AbstractScheduledService

    AbstractScheduledService 类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration()方法定义一个周期执行的任务,以及相应的 startUp()和 shutDown()方法。为了能够描述执行周期,你需要实现 scheduler()方法。通常情况下,你可以使用 AbstractScheduledService.Scheduler 类提供的两种调度器:newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit),类似于 JDK 并发包中 ScheduledExecutorService 类提供的两种调度方式。如要自定义 schedules 则可以使用 CustomScheduler 类来辅助实现;具体用法见 javadoc。

    AbstractService

    如需要自定义的线程管理、可以通过扩展 AbstractService 类来实现。一般情况下、使用上面的几个实现类就已经满足需求了,但如果在服务执行过程中有一些特定的线程处理需求、则建议继承 AbstractService 类。

    继承 AbstractService 方法必须实现两个方法.

    • doStart(): 首次调用 startAsync()时会同时调用 doStart(),doStart()内部需要处理所有的初始化工作、如果启动成功则调用 notifyStarted()方法;启动失败则调用 notifyFailed()

    • doStop(): 首次调用 stopAsync()会同时调用 doStop(),doStop()要做的事情就是停止服务,如果停止成功则调用 notifyStopped()方法;停止失败则调用 notifyFailed()方法。

    doStart 和 doStop 方法的实现需要考虑下性能,尽可能的低延迟。如果初始化的开销较大,如读文件,打开网络连接,或者其他任何可能引起阻塞的操作,建议移到另外一个单独的线程去处理。

    使用 ServiceManager

    除了对 Service 接口提供基础的实现类,Guava 还提供了 ServiceManager 类使得涉及到多个 Service 集合的操作更加容易。通过实例化 ServiceManager 类来创建一个 Service 集合,你可以通过以下方法来管理它们:

    • startAsync() : 将启动所有被管理的服务。如果当前服务的状态都是 NEW 的话、那么你只能调用该方法一次、这跟 Service#startAsync()是一样的。
    • stopAsync() :将停止所有被管理的服务。
    • addListener() : 会添加一个 ServiceManager.Listener,在服务状态转换中会调用该 Listener
    • awaitHealthy() :会等待所有的服务达到 Running 状态
    • awaitStopped():会等待所有服务达到终止状态

    检测类的方法有:

    • isHealthy() :如果所有的服务处于 Running 状态、会返回 True
    • servicesByState():以状态为索引返回当前所有服务的快照
    • startupTimes() :返回一个 Map 对象,记录被管理的服务启动的耗时、以毫秒为单位,同时 Map 默认按启动时间排序。

    我们建议整个服务的生命周期都能通过 ServiceManager 来管理,不过即使状态转换是通过其他机制触发的、也不影响 ServiceManager 方法的正确执行。例如:当一个服务不是通过 startAsync()、而是其他机制启动时,listeners 仍然可以被正常调用、awaitHealthy()也能够正常工作。ServiceManager 唯一强制的要求是当其被创建时所有的服务必须处于 New 状态。

    附:TestCase、也可以作为练习 Demo

    ServiceTest

        /*
         * Copyright (C) 2013 The Guava Authors
         *
         * Licensed under the Apache License, Version 2.0 (the "License");
         * you may not use this file except in compliance with the License.
         * You may obtain a copy of the License at
         *
         * http://www.apache.org/licenses/LICENSE-2.0
         *
         * Unless required by applicable law or agreed to in writing, software
         * distributed under the License is distributed on an "AS IS" BASIS,
         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
         * See the License for the specific language governing permissions and
         * limitations under the License.
         */
    
        package com.google.common.util.concurrent;
    
        import static com.google.common.util.concurrent.Service.State.FAILED;
        import static com.google.common.util.concurrent.Service.State.NEW;
        import static com.google.common.util.concurrent.Service.State.RUNNING;
        import static com.google.common.util.concurrent.Service.State.STARTING;
        import static com.google.common.util.concurrent.Service.State.STOPPING;
        import static com.google.common.util.concurrent.Service.State.TERMINATED;
    
        import junit.framework.TestCase;
    
        /**
         * Unit tests for {@link Service}
         */
        public class ServiceTest extends TestCase {
    
        /** Assert on the comparison ordering of the State enum since we guarantee it. */
         public void testStateOrdering() {
         // List every valid (direct) state transition.
         assertLessThan(NEW, STARTING);
         assertLessThan(NEW, TERMINATED);
    
         assertLessThan(STARTING, RUNNING);
         assertLessThan(STARTING, STOPPING);
         assertLessThan(STARTING, FAILED);
    
         assertLessThan(RUNNING, STOPPING);
         assertLessThan(RUNNING, FAILED);
    
         assertLessThan(STOPPING, FAILED);
         assertLessThan(STOPPING, TERMINATED);
         }
    
         private static <T extends Comparable<? super T>> void assertLessThan(T a, T b) {
         if (a.compareTo(b) >= 0) {
         fail(String.format("Expected %s to be less than %s", a, b));
         }
         }
        }
    

    AbstractIdleServiceTest

        /*
         * Copyright (C) 2009 The Guava Authors
         *
         * Licensed under the Apache License, Version 2.0 (the "License");
         * you may not use this file except in compliance with the License.
         * You may obtain a copy of the License at
         *
         * http://www.apache.org/licenses/LICENSE-2.0
         *
         * Unless required by applicable law or agreed to in writing, software
         * distributed under the License is distributed on an "AS IS" BASIS,
         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
         * See the License for the specific language governing permissions and
         * limitations under the License.
         */
    
        package com.google.common.util.concurrent;
    
        import static org.truth0.Truth.ASSERT;
    
        import com.google.common.collect.Lists;
    
        import junit.framework.TestCase;
    
        import java.util.List;
        import java.util.concurrent.Executor;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.TimeoutException;
    
        /**
         * Tests for {@link AbstractIdleService}.
         *
         * @author Chris Nokleberg
         * @author Ben Yu
         */
        public class AbstractIdleServiceTest extends TestCase {
    
        // Functional tests using real thread. We only verify publicly visible state.
         // Interaction assertions are done by the single-threaded unit tests.
    
        public static class FunctionalTest extends TestCase {
    
        private static class DefaultService extends AbstractIdleService {
         @Override protected void startUp() throws Exception {}
         @Override protected void shutDown() throws Exception {}
         }
    
        public void testServiceStartStop() throws Exception {
         AbstractIdleService service = new DefaultService();
         service.startAsync().awaitRunning();
         assertEquals(Service.State.RUNNING, service.state());
         service.stopAsync().awaitTerminated();
         assertEquals(Service.State.TERMINATED, service.state());
         }
    
        public void testStart_failed() throws Exception {
         final Exception exception = new Exception("deliberate");
         AbstractIdleService service = new DefaultService() {
         @Override protected void startUp() throws Exception {
         throw exception;
         }
         };
         try {
         service.startAsync().awaitRunning();
         fail();
         } catch (RuntimeException e) {
         assertSame(exception, e.getCause());
         }
         assertEquals(Service.State.FAILED, service.state());
         }
    
        public void testStop_failed() throws Exception {
         final Exception exception = new Exception("deliberate");
         AbstractIdleService service = new DefaultService() {
         @Override protected void shutDown() throws Exception {
         throw exception;
         }
         };
         service.startAsync().awaitRunning();
         try {
         service.stopAsync().awaitTerminated();
         fail();
         } catch (RuntimeException e) {
         assertSame(exception, e.getCause());
         }
         assertEquals(Service.State.FAILED, service.state());
         }
         }
    
        public void testStart() {
         TestService service = new TestService();
         assertEquals(0, service.startUpCalled);
         service.startAsync().awaitRunning();
         assertEquals(1, service.startUpCalled);
         assertEquals(Service.State.RUNNING, service.state());
         ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
         }
    
        public void testStart_failed() {
         final Exception exception = new Exception("deliberate");
         TestService service = new TestService() {
         @Override protected void startUp() throws Exception {
         super.startUp();
         throw exception;
         }
         };
         assertEquals(0, service.startUpCalled);
         try {
         service.startAsync().awaitRunning();
         fail();
         } catch (RuntimeException e) {
         assertSame(exception, e.getCause());
         }
         assertEquals(1, service.startUpCalled);
         assertEquals(Service.State.FAILED, service.state());
         ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
         }
    
        public void testStop_withoutStart() {
         TestService service = new TestService();
         service.stopAsync().awaitTerminated();
         assertEquals(0, service.startUpCalled);
         assertEquals(0, service.shutDownCalled);
         assertEquals(Service.State.TERMINATED, service.state());
         ASSERT.that(service.transitionStates).isEmpty();
         }
    
        public void testStop_afterStart() {
         TestService service = new TestService();
         service.startAsync().awaitRunning();
         assertEquals(1, service.startUpCalled);
         assertEquals(0, service.shutDownCalled);
         service.stopAsync().awaitTerminated();
         assertEquals(1, service.startUpCalled);
         assertEquals(1, service.shutDownCalled);
         assertEquals(Service.State.TERMINATED, service.state());
         ASSERT.that(service.transitionStates)
         .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
         }
    
        public void testStop_failed() {
         final Exception exception = new Exception("deliberate");
         TestService service = new TestService() {
         @Override protected void shutDown() throws Exception {
         super.shutDown();
         throw exception;
         }
         };
         service.startAsync().awaitRunning();
         assertEquals(1, service.startUpCalled);
         assertEquals(0, service.shutDownCalled);
         try {
         service.stopAsync().awaitTerminated();
         fail();
         } catch (RuntimeException e) {
         assertSame(exception, e.getCause());
         }
         assertEquals(1, service.startUpCalled);
         assertEquals(1, service.shutDownCalled);
         assertEquals(Service.State.FAILED, service.state());
         ASSERT.that(service.transitionStates)
         .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
         }
    
        public void testServiceToString() {
         AbstractIdleService service = new TestService();
         assertEquals("TestService [NEW]", service.toString());
         service.startAsync().awaitRunning();
         assertEquals("TestService [RUNNING]", service.toString());
         service.stopAsync().awaitTerminated();
         assertEquals("TestService [TERMINATED]", service.toString());
         }
    
        public void testTimeout() throws Exception {
         // Create a service whose executor will never run its commands
         Service service = new TestService() {
         @Override protected Executor executor() {
         return new Executor() {
         @Override public void execute(Runnable command) {}
         };
         }
         };
         try {
         service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
         fail("Expected timeout");
         } catch (TimeoutException e) {
         ASSERT.that(e.getMessage()).contains(Service.State.STARTING.toString());
         }
         }
    
        private static class TestService extends AbstractIdleService {
         int startUpCalled = 0;
         int shutDownCalled = 0;
         final List<State> transitionStates = Lists.newArrayList();
    
        @Override protected void startUp() throws Exception {
         assertEquals(0, startUpCalled);
         assertEquals(0, shutDownCalled);
         startUpCalled++;
         assertEquals(State.STARTING, state());
         }
    
        @Override protected void shutDown() throws Exception {
         assertEquals(1, startUpCalled);
         assertEquals(0, shutDownCalled);
         shutDownCalled++;
         assertEquals(State.STOPPING, state());
         }
    
        @Override protected Executor executor() {
         transitionStates.add(state());
         return MoreExecutors.sameThreadExecutor();
         }
         }
        }
    

    AbstractScheduledServiceTest

        /*
         * Copyright (C) 2011 The Guava Authors
         *
         * Licensed under the Apache License, Version 2.0 (the "License");
         * you may not use this file except in compliance with the License.
         * You may obtain a copy of the License at
         *
         * http://www.apache.org/licenses/LICENSE-2.0
         *
         * Unless required by applicable law or agreed to in writing, software
         * distributed under the License is distributed on an "AS IS" BASIS,
         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
         * See the License for the specific language governing permissions and
         * limitations under the License.
         */
    
        package com.google.common.util.concurrent;
    
        import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
        import com.google.common.util.concurrent.Service.State;
    
        import junit.framework.TestCase;
    
        import java.util.concurrent.CountDownLatch;
        import java.util.concurrent.CyclicBarrier;
        import java.util.concurrent.ExecutionException;
        import java.util.concurrent.Executors;
        import java.util.concurrent.Future;
        import java.util.concurrent.ScheduledExecutorService;
        import java.util.concurrent.ScheduledFuture;
        import java.util.concurrent.ScheduledThreadPoolExecutor;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.atomic.AtomicBoolean;
        import java.util.concurrent.atomic.AtomicInteger;
    
        /**
         * Unit test for {@link AbstractScheduledService}.
         *
         * @author Luke Sandberg
         */
    
        public class AbstractScheduledServiceTest extends TestCase {
    
        volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
         volatile ScheduledFuture<?> future = null;
    
        volatile boolean atFixedRateCalled = false;
         volatile boolean withFixedDelayCalled = false;
         volatile boolean scheduleCalled = false;
    
        final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
         @Override
         public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
         long delay, TimeUnit unit) {
         return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
         }
         };
    
        public void testServiceStartStop() throws Exception {
         NullService service = new NullService();
         service.startAsync().awaitRunning();
         assertFalse(future.isDone());
         service.stopAsync().awaitTerminated();
         assertTrue(future.isCancelled());
         }
    
        private class NullService extends AbstractScheduledService {
         @Override protected void runOneIteration() throws Exception {}
         @Override protected Scheduler scheduler() { return configuration; }
         @Override protected ScheduledExecutorService executor() { return executor; }
         }
    
        public void testFailOnExceptionFromRun() throws Exception {
         TestService service = new TestService();
         service.runException = new Exception();
         service.startAsync().awaitRunning();
         service.runFirstBarrier.await();
         service.runSecondBarrier.await();
         try {
         future.get();
         fail();
         } catch (ExecutionException e) {
         // An execution exception holds a runtime exception (from throwables.propogate) that holds our
         // original exception.
         assertEquals(service.runException, e.getCause().getCause());
         }
         assertEquals(service.state(), Service.State.FAILED);
         }
    
        public void testFailOnExceptionFromStartUp() {
         TestService service = new TestService();
         service.startUpException = new Exception();
         try {
         service.startAsync().awaitRunning();
         fail();
         } catch (IllegalStateException e) {
         assertEquals(service.startUpException, e.getCause());
         }
         assertEquals(0, service.numberOfTimesRunCalled.get());
         assertEquals(Service.State.FAILED, service.state());
         }
    
        public void testFailOnExceptionFromShutDown() throws Exception {
         TestService service = new TestService();
         service.shutDownException = new Exception();
         service.startAsync().awaitRunning();
         service.runFirstBarrier.await();
         service.stopAsync();
         service.runSecondBarrier.await();
         try {
         service.awaitTerminated();
         fail();
         } catch (IllegalStateException e) {
         assertEquals(service.shutDownException, e.getCause());
         }
         assertEquals(Service.State.FAILED, service.state());
         }
    
        public void testRunOneIterationCalledMultipleTimes() throws Exception {
         TestService service = new TestService();
         service.startAsync().awaitRunning();
         for (int i = 1; i < 10; i++) {
         service.runFirstBarrier.await();
         assertEquals(i, service.numberOfTimesRunCalled.get());
         service.runSecondBarrier.await();
         }
         service.runFirstBarrier.await();
         service.stopAsync();
         service.runSecondBarrier.await();
         service.stopAsync().awaitTerminated();
         }
    
        public void testExecutorOnlyCalledOnce() throws Exception {
         TestService service = new TestService();
         service.startAsync().awaitRunning();
         // It should be called once during startup.
         assertEquals(1, service.numberOfTimesExecutorCalled.get());
         for (int i = 1; i < 10; i++) {
         service.runFirstBarrier.await();
         assertEquals(i, service.numberOfTimesRunCalled.get());
         service.runSecondBarrier.await();
         }
         service.runFirstBarrier.await();
         service.stopAsync();
         service.runSecondBarrier.await();
         service.stopAsync().awaitTerminated();
         // Only called once overall.
         assertEquals(1, service.numberOfTimesExecutorCalled.get());
         }
    
        public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
         final CountDownLatch terminationLatch = new CountDownLatch(1);
         AbstractScheduledService service = new AbstractScheduledService() {
         volatile ScheduledExecutorService executorService;
         @Override protected void runOneIteration() throws Exception {}
    
        @Override protected ScheduledExecutorService executor() {
         if (executorService == null) {
         executorService = super.executor();
         // Add a listener that will be executed after the listener that shuts down the executor.
         addListener(new Listener() {
         @Override public void terminated(State from) {
         terminationLatch.countDown();
         }
         }, MoreExecutors.sameThreadExecutor());
         }
         return executorService;
         }
    
        @Override protected Scheduler scheduler() {
         return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
         }};
    
        service.startAsync();
         assertFalse(service.executor().isShutdown());
         service.awaitRunning();
         service.stopAsync();
         terminationLatch.await();
         assertTrue(service.executor().isShutdown());
         assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
         }
    
        public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
         final CountDownLatch failureLatch = new CountDownLatch(1);
         AbstractScheduledService service = new AbstractScheduledService() {
         volatile ScheduledExecutorService executorService;
         @Override protected void runOneIteration() throws Exception {}
    
        @Override protected void startUp() throws Exception {
         throw new Exception("Failed");
         }
    
        @Override protected ScheduledExecutorService executor() {
         if (executorService == null) {
         executorService = super.executor();
         // Add a listener that will be executed after the listener that shuts down the executor.
         addListener(new Listener() {
         @Override public void failed(State from, Throwable failure) {
         failureLatch.countDown();
         }
         }, MoreExecutors.sameThreadExecutor());
         }
         return executorService;
         }
    
        @Override protected Scheduler scheduler() {
         return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
         }};
    
        try {
         service.startAsync().awaitRunning();
         fail("Expected service to fail during startup");
         } catch (IllegalStateException expected) {}
         failureLatch.await();
         assertTrue(service.executor().isShutdown());
         assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
         }
    
        public void testSchedulerOnlyCalledOnce() throws Exception {
         TestService service = new TestService();
         service.startAsync().awaitRunning();
         // It should be called once during startup.
         assertEquals(1, service.numberOfTimesSchedulerCalled.get());
         for (int i = 1; i < 10; i++) {
         service.runFirstBarrier.await();
         assertEquals(i, service.numberOfTimesRunCalled.get());
         service.runSecondBarrier.await();
         }
         service.runFirstBarrier.await();
         service.stopAsync();
         service.runSecondBarrier.await();
         service.awaitTerminated();
         // Only called once overall.
         assertEquals(1, service.numberOfTimesSchedulerCalled.get());
         }
    
        private class TestService extends AbstractScheduledService {
         CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
         CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
    
        volatile boolean startUpCalled = false;
         volatile boolean shutDownCalled = false;
         AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
         AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
         AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
         volatile Exception runException = null;
         volatile Exception startUpException = null;
         volatile Exception shutDownException = null;
    
        @Override
         protected void runOneIteration() throws Exception {
         assertTrue(startUpCalled);
         assertFalse(shutDownCalled);
         numberOfTimesRunCalled.incrementAndGet();
         assertEquals(State.RUNNING, state());
         runFirstBarrier.await();
         runSecondBarrier.await();
         if (runException != null) {
         throw runException;
         }
         }
    
        @Override
         protected void startUp() throws Exception {
         assertFalse(startUpCalled);
         assertFalse(shutDownCalled);
         startUpCalled = true;
         assertEquals(State.STARTING, state());
         if (startUpException != null) {
         throw startUpException;
         }
         }
    
        @Override
         protected void shutDown() throws Exception {
         assertTrue(startUpCalled);
         assertFalse(shutDownCalled);
         shutDownCalled = true;
         if (shutDownException != null) {
         throw shutDownException;
         }
         }
    
        @Override
         protected ScheduledExecutorService executor() {
         numberOfTimesExecutorCalled.incrementAndGet();
         return executor;
         }
    
        @Override
         protected Scheduler scheduler() {
         numberOfTimesSchedulerCalled.incrementAndGet();
         return configuration;
         }
         }
    
        public static class SchedulerTest extends TestCase {
         // These constants are arbitrary and just used to make sure that the correct method is called
         // with the correct parameters.
         private static final int initialDelay = 10;
         private static final int delay = 20;
         private static final TimeUnit unit = TimeUnit.MILLISECONDS;
    
        // Unique runnable object used for comparison.
         final Runnable testRunnable = new Runnable() {@Override public void run() {}};
         boolean called = false;
    
        private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
         long delay, TimeUnit unit) {
         assertFalse(called); // only called once.
         called = true;
         assertEquals(SchedulerTest.initialDelay, initialDelay);
         assertEquals(SchedulerTest.delay, delay);
         assertEquals(SchedulerTest.unit, unit);
         assertEquals(testRunnable, command);
         }
    
        public void testFixedRateSchedule() {
         Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
         schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
         @Override
         public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
         long period, TimeUnit unit) {
         assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
         return null;
         }
         }, testRunnable);
         assertTrue(called);
         }
    
        public void testFixedDelaySchedule() {
         Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
         schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
         @Override
         public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
         long delay, TimeUnit unit) {
         assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
         return null;
         }
         }, testRunnable);
         assertTrue(called);
         }
    
        private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
         public AtomicInteger scheduleCounter = new AtomicInteger(0);
         @Override
         protected Schedule getNextSchedule() throws Exception {
         scheduleCounter.incrementAndGet();
         return new Schedule(0, TimeUnit.SECONDS);
         }
         }
    
        public void testCustomSchedule_startStop() throws Exception {
         final CyclicBarrier firstBarrier = new CyclicBarrier(2);
         final CyclicBarrier secondBarrier = new CyclicBarrier(2);
         final AtomicBoolean shouldWait = new AtomicBoolean(true);
         Runnable task = new Runnable() {
         @Override public void run() {
         try {
         if (shouldWait.get()) {
         firstBarrier.await();
         secondBarrier.await();
         }
         } catch (Exception e) {
         throw new RuntimeException(e);
         }
         }
         };
         TestCustomScheduler scheduler = new TestCustomScheduler();
         Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
         firstBarrier.await();
         assertEquals(1, scheduler.scheduleCounter.get());
         secondBarrier.await();
         firstBarrier.await();
         assertEquals(2, scheduler.scheduleCounter.get());
         shouldWait.set(false);
         secondBarrier.await();
         future.cancel(false);
         }
    
        public void testCustomSchedulerServiceStop() throws Exception {
         TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
         service.startAsync().awaitRunning();
         service.firstBarrier.await();
         assertEquals(1, service.numIterations.get());
         service.stopAsync();
         service.secondBarrier.await();
         service.awaitTerminated();
         // Sleep for a while just to ensure that our task wasn't called again.
         Thread.sleep(unit.toMillis(3 * delay));
         assertEquals(1, service.numIterations.get());
         }
    
        public void testBig() throws Exception {
         TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
         @Override protected Scheduler scheduler() {
         return new AbstractScheduledService.CustomScheduler() {
         @Override
         protected Schedule getNextSchedule() throws Exception {
         // Explicitly yield to increase the probability of a pathological scheduling.
         Thread.yield();
         return new Schedule(0, TimeUnit.SECONDS);
         }
         };
         }
         };
         service.useBarriers = false;
         service.startAsync().awaitRunning();
         Thread.sleep(50);
         service.useBarriers = true;
         service.firstBarrier.await();
         int numIterations = service.numIterations.get();
         service.stopAsync();
         service.secondBarrier.await();
         service.awaitTerminated();
         assertEquals(numIterations, service.numIterations.get());
         }
    
        private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
         final AtomicInteger numIterations = new AtomicInteger(0);
         volatile boolean useBarriers = true;
         final CyclicBarrier firstBarrier = new CyclicBarrier(2);
         final CyclicBarrier secondBarrier = new CyclicBarrier(2);
    
        @Override protected void runOneIteration() throws Exception {
         numIterations.incrementAndGet();
         if (useBarriers) {
         firstBarrier.await();
         secondBarrier.await();
         }
         }
    
        @Override protected ScheduledExecutorService executor() {
         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
         return Executors.newScheduledThreadPool(10);
         }
    
        @Override protected void startUp() throws Exception {}
    
        @Override protected void shutDown() throws Exception {}
    
        @Override protected Scheduler scheduler() {
         return new CustomScheduler() {
         @Override
         protected Schedule getNextSchedule() throws Exception {
         return new Schedule(delay, unit);
         }};
         }
         }
    
        public void testCustomSchedulerFailure() throws Exception {
         TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
         service.startAsync().awaitRunning();
         for (int i = 1; i < 4; i++) {
         service.firstBarrier.await();
         assertEquals(i, service.numIterations.get());
         service.secondBarrier.await();
         }
         Thread.sleep(1000);
         try {
         service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
         fail();
         } catch (IllegalStateException e) {
         assertEquals(State.FAILED, service.state());
         }
         }
    
        private static class TestFailingCustomScheduledService extends AbstractScheduledService {
         final AtomicInteger numIterations = new AtomicInteger(0);
         final CyclicBarrier firstBarrier = new CyclicBarrier(2);
         final CyclicBarrier secondBarrier = new CyclicBarrier(2);
    
        @Override protected void runOneIteration() throws Exception {
         numIterations.incrementAndGet();
         firstBarrier.await();
         secondBarrier.await();
         }
    
        @Override protected ScheduledExecutorService executor() {
         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
         return Executors.newScheduledThreadPool(10);
         }
    
        @Override protected Scheduler scheduler() {
         return new CustomScheduler() {
         @Override
         protected Schedule getNextSchedule() throws Exception {
         if (numIterations.get() > 2) {
         throw new IllegalStateException("Failed");
         }
         return new Schedule(delay, unit);
         }};
         }
         }
         }
        }
    

    AbstractServiceTest

        /*
         * Copyright (C) 2009 The Guava Authors
         *
         * Licensed under the Apache License, Version 2.0 (the "License");
         * you may not use this file except in compliance with the License.
         * You may obtain a copy of the License at
         *
         * http://www.apache.org/licenses/LICENSE-2.0
         *
         * Unless required by applicable law or agreed to in writing, software
         * distributed under the License is distributed on an "AS IS" BASIS,
         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
         * See the License for the specific language governing permissions and
         * limitations under the License.
         */
    
        package com.google.common.util.concurrent;
    
        import static java.lang.Thread.currentThread;
        import static java.util.concurrent.TimeUnit.SECONDS;
    
        import com.google.common.collect.ImmutableList;
        import com.google.common.collect.Iterables;
        import com.google.common.collect.Lists;
        import com.google.common.util.concurrent.Service.Listener;
        import com.google.common.util.concurrent.Service.State;
    
        import junit.framework.TestCase;
    
        import java.lang.Thread.UncaughtExceptionHandler;
        import java.util.List;
        import java.util.concurrent.CountDownLatch;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.atomic.AtomicInteger;
        import java.util.concurrent.atomic.AtomicReference;
    
        import javax.annotation.concurrent.GuardedBy;
    
        /**
         * Unit test for {@link AbstractService}.
         *
         * @author Jesse Wilson
         */
        public class AbstractServiceTest extends TestCase {
    
        private Thread executionThread;
         private Throwable thrownByExecutionThread;
    
        public void testNoOpServiceStartStop() throws Exception {
         NoOpService service = new NoOpService();
         RecordingListener listener = RecordingListener.record(service);
    
        assertEquals(State.NEW, service.state());
         assertFalse(service.isRunning());
         assertFalse(service.running);
    
        service.startAsync();
         assertEquals(State.RUNNING, service.state());
         assertTrue(service.isRunning());
         assertTrue(service.running);
    
        service.stopAsync();
         assertEquals(State.TERMINATED, service.state());
         assertFalse(service.isRunning());
         assertFalse(service.running);
         assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.RUNNING,
         State.STOPPING,
         State.TERMINATED),
         listener.getStateHistory());
         }
    
        public void testNoOpServiceStartAndWaitStopAndWait() throws Exception {
         NoOpService service = new NoOpService();
    
        service.startAsync().awaitRunning();
         assertEquals(State.RUNNING, service.state());
    
        service.stopAsync().awaitTerminated();
         assertEquals(State.TERMINATED, service.state());
         }
    
        public void testNoOpServiceStartAsyncAndAwaitStopAsyncAndAwait() throws Exception {
         NoOpService service = new NoOpService();
    
        service.startAsync().awaitRunning();
         assertEquals(State.RUNNING, service.state());
    
        service.stopAsync().awaitTerminated();
         assertEquals(State.TERMINATED, service.state());
         }
    
        public void testNoOpServiceStopIdempotence() throws Exception {
         NoOpService service = new NoOpService();
         RecordingListener listener = RecordingListener.record(service);
         service.startAsync().awaitRunning();
         assertEquals(State.RUNNING, service.state());
    
        service.stopAsync();
         service.stopAsync();
         assertEquals(State.TERMINATED, service.state());
         assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.RUNNING,
         State.STOPPING,
         State.TERMINATED),
         listener.getStateHistory());
         }
    
        public void testNoOpServiceStopIdempotenceAfterWait() throws Exception {
         NoOpService service = new NoOpService();
    
        service.startAsync().awaitRunning();
    
        service.stopAsync().awaitTerminated();
         service.stopAsync();
         assertEquals(State.TERMINATED, service.state());
         }
    
        public void testNoOpServiceStopIdempotenceDoubleWait() throws Exception {
         NoOpService service = new NoOpService();
    
        service.startAsync().awaitRunning();
         assertEquals(State.RUNNING, service.state());
    
        service.stopAsync().awaitTerminated();
         service.stopAsync().awaitTerminated();
         assertEquals(State.TERMINATED, service.state());
         }
    
        public void testNoOpServiceStartStopAndWaitUninterruptible()
         throws Exception {
         NoOpService service = new NoOpService();
    
        currentThread().interrupt();
         try {
         service.startAsync().awaitRunning();
         assertEquals(State.RUNNING, service.state());
    
        service.stopAsync().awaitTerminated();
         assertEquals(State.TERMINATED, service.state());
    
        assertTrue(currentThread().isInterrupted());
         } finally {
         Thread.interrupted(); // clear interrupt for future tests
         }
         }
    
        private static class NoOpService extends AbstractService {
         boolean running = false;
    
        @Override protected void doStart() {
         assertFalse(running);
         running = true;
         notifyStarted();
         }
    
        @Override protected void doStop() {
         assertTrue(running);
         running = false;
         notifyStopped();
         }
         }
    
        public void testManualServiceStartStop() throws Exception {
         ManualSwitchedService service = new ManualSwitchedService();
         RecordingListener listener = RecordingListener.record(service);
    
        service.startAsync();
         assertEquals(State.STARTING, service.state());
         assertFalse(service.isRunning());
         assertTrue(service.doStartCalled);
    
        service.notifyStarted(); // usually this would be invoked by another thread
         assertEquals(State.RUNNING, service.state());
         assertTrue(service.isRunning());
    
        service.stopAsync();
         assertEquals(State.STOPPING, service.state());
         assertFalse(service.isRunning());
         assertTrue(service.doStopCalled);
    
        service.notifyStopped(); // usually this would be invoked by another thread
         assertEquals(State.TERMINATED, service.state());
         assertFalse(service.isRunning());
         assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.RUNNING,
         State.STOPPING,
         State.TERMINATED),
         listener.getStateHistory());
    
        }
    
        public void testManualServiceNotifyStoppedWhileRunning() throws Exception {
         ManualSwitchedService service = new ManualSwitchedService();
         RecordingListener listener = RecordingListener.record(service);
    
        service.startAsync();
         service.notifyStarted();
         service.notifyStopped();
         assertEquals(State.TERMINATED, service.state());
         assertFalse(service.isRunning());
         assertFalse(service.doStopCalled);
    
        assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.RUNNING,
         State.TERMINATED),
         listener.getStateHistory());
         }
    
        public void testManualServiceStopWhileStarting() throws Exception {
         ManualSwitchedService service = new ManualSwitchedService();
         RecordingListener listener = RecordingListener.record(service);
    
        service.startAsync();
         assertEquals(State.STARTING, service.state());
         assertFalse(service.isRunning());
         assertTrue(service.doStartCalled);
    
        service.stopAsync();
         assertEquals(State.STOPPING, service.state());
         assertFalse(service.isRunning());
         assertFalse(service.doStopCalled);
    
        service.notifyStarted();
         assertEquals(State.STOPPING, service.state());
         assertFalse(service.isRunning());
         assertTrue(service.doStopCalled);
    
        service.notifyStopped();
         assertEquals(State.TERMINATED, service.state());
         assertFalse(service.isRunning());
         assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.STOPPING,
         State.TERMINATED),
         listener.getStateHistory());
         }
    
        /**
         * This tests for a bug where if {@link Service#stopAsync()} was called while the service was
         * {@link State#STARTING} more than once, the {@link Listener#stopping(State)} callback would get
         * called multiple times.
         */
         public void testManualServiceStopMultipleTimesWhileStarting() throws Exception {
         ManualSwitchedService service = new ManualSwitchedService();
         final AtomicInteger stopppingCount = new AtomicInteger();
         service.addListener(new Listener() {
         @Override public void stopping(State from) {
         stopppingCount.incrementAndGet();
         }
         }, MoreExecutors.sameThreadExecutor());
    
        service.startAsync();
         service.stopAsync();
         assertEquals(1, stopppingCount.get());
         service.stopAsync();
         assertEquals(1, stopppingCount.get());
         }
    
        public void testManualServiceStopWhileNew() throws Exception {
         ManualSwitchedService service = new ManualSwitchedService();
         RecordingListener listener = RecordingListener.record(service);
    
        service.stopAsync();
         assertEquals(State.TERMINATED, service.state());
         assertFalse(service.isRunning());
         assertFalse(service.doStartCalled);
         assertFalse(service.doStopCalled);
         assertEquals(ImmutableList.of(State.TERMINATED), listener.getStateHistory());
         }
    
        public void testManualServiceFailWhileStarting() throws Exception {
         ManualSwitchedService service = new ManualSwitchedService();
         RecordingListener listener = RecordingListener.record(service);
         service.startAsync();
         service.notifyFailed(EXCEPTION);
         assertEquals(ImmutableList.of(State.STARTING, State.FAILED), listener.getStateHistory());
         }
    
        public void testManualServiceFailWhileRunning() throws Exception {
         ManualSwitchedService service = new ManualSwitchedService();
         RecordingListener listener = RecordingListener.record(service);
         service.startAsync();
         service.notifyStarted();
         service.notifyFailed(EXCEPTION);
         assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.FAILED),
         listener.getStateHistory());
         }
    
        public void testManualServiceFailWhileStopping() throws Exception {
         ManualSwitchedService service = new ManualSwitchedService();
         RecordingListener listener = RecordingListener.record(service);
         service.startAsync();
         service.notifyStarted();
         service.stopAsync();
         service.notifyFailed(EXCEPTION);
         assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.STOPPING, State.FAILED),
         listener.getStateHistory());
         }
    
        public void testManualServiceUnrequestedStop() {
         ManualSwitchedService service = new ManualSwitchedService();
    
        service.startAsync();
    
        service.notifyStarted();
         assertEquals(State.RUNNING, service.state());
         assertTrue(service.isRunning());
         assertFalse(service.doStopCalled);
    
        service.notifyStopped();
         assertEquals(State.TERMINATED, service.state());
         assertFalse(service.isRunning());
         assertFalse(service.doStopCalled);
         }
    
        /**
         * The user of this service should call {@link #notifyStarted} and {@link
         * #notifyStopped} after calling {@link #startAsync} and {@link #stopAsync}.
         */
         private static class ManualSwitchedService extends AbstractService {
         boolean doStartCalled = false;
         boolean doStopCalled = false;
    
        @Override protected void doStart() {
         assertFalse(doStartCalled);
         doStartCalled = true;
         }
    
        @Override protected void doStop() {
         assertFalse(doStopCalled);
         doStopCalled = true;
         }
         }
    
        public void testAwaitTerminated() throws Exception {
         final NoOpService service = new NoOpService();
         Thread waiter = new Thread() {
         @Override public void run() {
         service.awaitTerminated();
         }
         };
         waiter.start();
         service.startAsync().awaitRunning();
         assertEquals(State.RUNNING, service.state());
         service.stopAsync();
         waiter.join(100); // ensure that the await in the other thread is triggered
         assertFalse(waiter.isAlive());
         }
    
        public void testAwaitTerminated_FailedService() throws Exception {
         final ManualSwitchedService service = new ManualSwitchedService();
         final AtomicReference<Throwable> exception = Atomics.newReference();
         Thread waiter = new Thread() {
         @Override public void run() {
         try {
         service.awaitTerminated();
         fail("Expected an IllegalStateException");
         } catch (Throwable t) {
         exception.set(t);
         }
         }
         };
         waiter.start();
         service.startAsync();
         service.notifyStarted();
         assertEquals(State.RUNNING, service.state());
         service.notifyFailed(EXCEPTION);
         assertEquals(State.FAILED, service.state());
         waiter.join(100);
         assertFalse(waiter.isAlive());
         assertTrue(exception.get() instanceof IllegalStateException);
         assertEquals(EXCEPTION, exception.get().getCause());
         }
    
        public void testThreadedServiceStartAndWaitStopAndWait() throws Throwable {
         ThreadedService service = new ThreadedService();
         RecordingListener listener = RecordingListener.record(service);
         service.startAsync().awaitRunning();
         assertEquals(State.RUNNING, service.state());
    
        service.awaitRunChecks();
    
        service.stopAsync().awaitTerminated();
         assertEquals(State.TERMINATED, service.state());
    
        throwIfSet(thrownByExecutionThread);
         assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.RUNNING,
         State.STOPPING,
         State.TERMINATED),
         listener.getStateHistory());
         }
    
        public void testThreadedServiceStopIdempotence() throws Throwable {
         ThreadedService service = new ThreadedService();
    
        service.startAsync().awaitRunning();
         assertEquals(State.RUNNING, service.state());
    
        service.awaitRunChecks();
    
        service.stopAsync();
         service.stopAsync().awaitTerminated();
         assertEquals(State.TERMINATED, service.state());
    
        throwIfSet(thrownByExecutionThread);
         }
    
        public void testThreadedServiceStopIdempotenceAfterWait()
         throws Throwable {
         ThreadedService service = new ThreadedService();
    
        service.startAsync().awaitRunning();
         assertEquals(State.RUNNING, service.state());
    
        service.awaitRunChecks();
    
        service.stopAsync().awaitTerminated();
         service.stopAsync();
         assertEquals(State.TERMINATED, service.state());
    
        executionThread.join();
    
        throwIfSet(thrownByExecutionThread);
         }
    
        public void testThreadedServiceStopIdempotenceDoubleWait()
         throws Throwable {
         ThreadedService service = new ThreadedService();
    
        service.startAsync().awaitRunning();
         assertEquals(State.RUNNING, service.state());
    
        service.awaitRunChecks();
    
        service.stopAsync().awaitTerminated();
         service.stopAsync().awaitTerminated();
         assertEquals(State.TERMINATED, service.state());
    
        throwIfSet(thrownByExecutionThread);
         }
    
        public void testManualServiceFailureIdempotence() {
         ManualSwitchedService service = new ManualSwitchedService();
         RecordingListener.record(service);
         service.startAsync();
         service.notifyFailed(new Exception("1"));
         service.notifyFailed(new Exception("2"));
         assertEquals("1", service.failureCause().getMessage());
         try {
         service.awaitRunning();
         fail();
         } catch (IllegalStateException e) {
         assertEquals("1", e.getCause().getMessage());
         }
         }
    
        private class ThreadedService extends AbstractService {
         final CountDownLatch hasConfirmedIsRunning = new CountDownLatch(1);
    
        /*
         * The main test thread tries to stop() the service shortly after
         * confirming that it is running. Meanwhile, the service itself is trying
         * to confirm that it is running. If the main thread's stop() call happens
         * before it has the chance, the test will fail. To avoid this, the main
         * thread calls this method, which waits until the service has performed
         * its own "running" check.
         */
         void awaitRunChecks() throws InterruptedException {
         assertTrue("Service thread hasn't finished its checks. "
         + "Exception status (possibly stale): " + thrownByExecutionThread,
         hasConfirmedIsRunning.await(10, SECONDS));
         }
    
        @Override protected void doStart() {
         assertEquals(State.STARTING, state());
         invokeOnExecutionThreadForTest(new Runnable() {
         @Override public void run() {
         assertEquals(State.STARTING, state());
         notifyStarted();
         assertEquals(State.RUNNING, state());
         hasConfirmedIsRunning.countDown();
         }
         });
         }
    
        @Override protected void doStop() {
         assertEquals(State.STOPPING, state());
         invokeOnExecutionThreadForTest(new Runnable() {
         @Override public void run() {
         assertEquals(State.STOPPING, state());
         notifyStopped();
         assertEquals(State.TERMINATED, state());
         }
         });
         }
         }
    
        private void invokeOnExecutionThreadForTest(Runnable runnable) {
         executionThread = new Thread(runnable);
         executionThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
         @Override
         public void uncaughtException(Thread thread, Throwable e) {
         thrownByExecutionThread = e;
         }
         });
         executionThread.start();
         }
    
        private static void throwIfSet(Throwable t) throws Throwable {
         if (t != null) {
         throw t;
         }
         }
    
        public void testStopUnstartedService() throws Exception {
         NoOpService service = new NoOpService();
         RecordingListener listener = RecordingListener.record(service);
    
        service.stopAsync();
         assertEquals(State.TERMINATED, service.state());
    
        try {
         service.startAsync();
         fail();
         } catch (IllegalStateException expected) {}
         assertEquals(State.TERMINATED, Iterables.getOnlyElement(listener.getStateHistory()));
         }
    
        public void testFailingServiceStartAndWait() throws Exception {
         StartFailingService service = new StartFailingService();
         RecordingListener listener = RecordingListener.record(service);
    
        try {
         service.startAsync().awaitRunning();
         fail();
         } catch (IllegalStateException e) {
         assertEquals(EXCEPTION, service.failureCause());
         assertEquals(EXCEPTION, e.getCause());
         }
         assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.FAILED),
         listener.getStateHistory());
         }
    
        public void testFailingServiceStopAndWait_stopFailing() throws Exception {
         StopFailingService service = new StopFailingService();
         RecordingListener listener = RecordingListener.record(service);
    
        service.startAsync().awaitRunning();
         try {
         service.stopAsync().awaitTerminated();
         fail();
         } catch (IllegalStateException e) {
         assertEquals(EXCEPTION, service.failureCause());
         assertEquals(EXCEPTION, e.getCause());
         }
         assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.RUNNING,
         State.STOPPING,
         State.FAILED),
         listener.getStateHistory());
         }
    
        public void testFailingServiceStopAndWait_runFailing() throws Exception {
         RunFailingService service = new RunFailingService();
         RecordingListener listener = RecordingListener.record(service);
    
        service.startAsync();
         try {
         service.awaitRunning();
         fail();
         } catch (IllegalStateException e) {
         assertEquals(EXCEPTION, service.failureCause());
         assertEquals(EXCEPTION, e.getCause());
         }
         assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.RUNNING,
         State.FAILED),
         listener.getStateHistory());
         }
    
        public void testThrowingServiceStartAndWait() throws Exception {
         StartThrowingService service = new StartThrowingService();
         RecordingListener listener = RecordingListener.record(service);
    
        try {
         service.startAsync().awaitRunning();
         fail();
         } catch (IllegalStateException e) {
         assertEquals(service.exception, service.failureCause());
         assertEquals(service.exception, e.getCause());
         }
         assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.FAILED),
         listener.getStateHistory());
         }
    
        public void testThrowingServiceStopAndWait_stopThrowing() throws Exception {
         StopThrowingService service = new StopThrowingService();
         RecordingListener listener = RecordingListener.record(service);
    
        service.startAsync().awaitRunning();
         try {
         service.stopAsync().awaitTerminated();
         fail();
         } catch (IllegalStateException e) {
         assertEquals(service.exception, service.failureCause());
         assertEquals(service.exception, e.getCause());
         }
         assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.RUNNING,
         State.STOPPING,
         State.FAILED),
         listener.getStateHistory());
         }
    
        public void testThrowingServiceStopAndWait_runThrowing() throws Exception {
         RunThrowingService service = new RunThrowingService();
         RecordingListener listener = RecordingListener.record(service);
    
        service.startAsync();
         try {
         service.awaitTerminated();
         fail();
         } catch (IllegalStateException e) {
         assertEquals(service.exception, service.failureCause());
         assertEquals(service.exception, e.getCause());
         }
         assertEquals(
         ImmutableList.of(
         State.STARTING,
         State.RUNNING,
         State.FAILED),
         listener.getStateHistory());
         }
    
        public void testFailureCause_throwsIfNotFailed() {
         StopFailingService service = new StopFailingService();
         try {
         service.failureCause();
         fail();
         } catch (IllegalStateException e) {
         // expected
         }
         service.startAsync().awaitRunning();
         try {
         service.failureCause();
         fail();
         } catch (IllegalStateException e) {
         // expected
         }
         try {
         service.stopAsync().awaitTerminated();
         fail();
         } catch (IllegalStateException e) {
         assertEquals(EXCEPTION, service.failureCause());
         assertEquals(EXCEPTION, e.getCause());
         }
         }
    
        public void testAddListenerAfterFailureDoesntCauseDeadlock() throws InterruptedException {
         final StartFailingService service = new StartFailingService();
         service.startAsync();
         assertEquals(State.FAILED, service.state());
         service.addListener(new RecordingListener(service), MoreExecutors.sameThreadExecutor());
         Thread thread = new Thread() {
         @Override public void run() {
         // Internally stopAsync() grabs a lock, this could be any such method on AbstractService.
         service.stopAsync();
         }
         };
         thread.start();
         thread.join(100);
         assertFalse(thread + " is deadlocked", thread.isAlive());
         }
    
        public void testListenerDoesntDeadlockOnStartAndWaitFromRunning() throws Exception {
         final NoOpThreadedService service = new NoOpThreadedService();
         service.addListener(new Listener() {
         @Override public void running() {
         service.awaitRunning();
         }
         }, MoreExecutors.sameThreadExecutor());
         service.startAsync().awaitRunning(10, TimeUnit.MILLISECONDS);
         service.stopAsync();
         }
    
        public void testListenerDoesntDeadlockOnStopAndWaitFromTerminated() throws Exception {
         final NoOpThreadedService service = new NoOpThreadedService();
         service.addListener(new Listener() {
         @Override public void terminated(State from) {
         service.stopAsync().awaitTerminated();
         }
         }, MoreExecutors.sameThreadExecutor());
         service.startAsync().awaitRunning();
    
        Thread thread = new Thread() {
         @Override public void run() {
         service.stopAsync().awaitTerminated();
         }
         };
         thread.start();
         thread.join(100);
         assertFalse(thread + " is deadlocked", thread.isAlive());
         }
    
        private static class NoOpThreadedService extends AbstractExecutionThreadService {
         final CountDownLatch latch = new CountDownLatch(1);
         @Override protected void run() throws Exception {
         latch.await();
         }
         @Override protected void triggerShutdown() {
         latch.countDown();
         }
         }
    
        private static class StartFailingService extends AbstractService {
         @Override protected void doStart() {
         notifyFailed(EXCEPTION);
         }
    
        @Override protected void doStop() {
         fail();
         }
         }
    
        private static class RunFailingService extends AbstractService {
         @Override protected void doStart() {
         notifyStarted();
         notifyFailed(EXCEPTION);
         }
    
        @Override protected void doStop() {
         fail();
         }
         }
    
        private static class StopFailingService extends AbstractService {
         @Override protected void doStart() {
         notifyStarted();
         }
    
        @Override protected void doStop() {
         notifyFailed(EXCEPTION);
         }
         }
    
        private static class StartThrowingService extends AbstractService {
    
        final RuntimeException exception = new RuntimeException("deliberate");
    
        @Override protected void doStart() {
         throw exception;
         }
    
        @Override protected void doStop() {
         fail();
         }
         }
    
        private static class RunThrowingService extends AbstractService {
    
        final RuntimeException exception = new RuntimeException("deliberate");
    
        @Override protected void doStart() {
         notifyStarted();
         throw exception;
         }
    
        @Override protected void doStop() {
         fail();
         }
         }
    
        private static class StopThrowingService extends AbstractService {
    
        final RuntimeException exception = new RuntimeException("deliberate");
    
        @Override protected void doStart() {
         notifyStarted();
         }
    
        @Override protected void doStop() {
         throw exception;
         }
         }
    
        private static class RecordingListener extends Listener {
         static RecordingListener record(Service service) {
         RecordingListener listener = new RecordingListener(service);
         service.addListener(listener, MoreExecutors.sameThreadExecutor());
         return listener;
         }
    
        final Service service;
    
        RecordingListener(Service service) {
         this.service = service;
         }
    
        @GuardedBy("this")
         final List<State> stateHistory = Lists.newArrayList();
         final CountDownLatch completionLatch = new CountDownLatch(1);
    
        ImmutableList<State> getStateHistory() throws Exception {
         completionLatch.await();
         synchronized (this) {
         return ImmutableList.copyOf(stateHistory);
         }
         }
    
        @Override public synchronized void starting() {
         assertTrue(stateHistory.isEmpty());
         assertNotSame(State.NEW, service.state());
         stateHistory.add(State.STARTING);
         }
    
        @Override public synchronized void running() {
         assertEquals(State.STARTING, Iterables.getOnlyElement(stateHistory));
         stateHistory.add(State.RUNNING);
         service.awaitRunning();
         assertNotSame(State.STARTING, service.state());
         }
    
        @Override public synchronized void stopping(State from) {
         assertEquals(from, Iterables.getLast(stateHistory));
         stateHistory.add(State.STOPPING);
         if (from == State.STARTING) {
         try {
         service.awaitRunning();
         fail();
         } catch (IllegalStateException expected) {
         assertNull(expected.getCause());
         assertTrue(expected.getMessage().equals(
         "Expected the service to be RUNNING, but was STOPPING"));
         }
         }
         assertNotSame(from, service.state());
         }
    
        @Override public synchronized void terminated(State from) {
         assertEquals(from, Iterables.getLast(stateHistory, State.NEW));
         stateHistory.add(State.TERMINATED);
         assertEquals(State.TERMINATED, service.state());
         if (from == State.NEW) {
         try {
         service.awaitRunning();
         fail();
         } catch (IllegalStateException expected) {
         assertNull(expected.getCause());
         assertTrue(expected.getMessage().equals(
         "Expected the service to be RUNNING, but was TERMINATED"));
         }
         }
         completionLatch.countDown();
         }
    
        @Override public synchronized void failed(State from, Throwable failure) {
         assertEquals(from, Iterables.getLast(stateHistory));
         stateHistory.add(State.FAILED);
         assertEquals(State.FAILED, service.state());
         assertEquals(failure, service.failureCause());
         if (from == State.STARTING) {
         try {
         service.awaitRunning();
         fail();
         } catch (IllegalStateException e) {
         assertEquals(failure, e.getCause());
         }
         }
         try {
         service.awaitTerminated();
         fail();
         } catch (IllegalStateException e) {
         assertEquals(failure, e.getCause());
         }
         completionLatch.countDown();
         }
         }
    
        public void testNotifyStartedWhenNotStarting() {
         AbstractService service = new DefaultService();
         try {
         service.notifyStarted();
         fail();
         } catch (IllegalStateException expected) {}
         }
    
        public void testNotifyStoppedWhenNotRunning() {
         AbstractService service = new DefaultService();
         try {
         service.notifyStopped();
         fail();
         } catch (IllegalStateException expected) {}
         }
    
        public void testNotifyFailedWhenNotStarted() {
         AbstractService service = new DefaultService();
         try {
         service.notifyFailed(new Exception());
         fail();
         } catch (IllegalStateException expected) {}
         }
    
        public void testNotifyFailedWhenTerminated() {
         NoOpService service = new NoOpService();
         service.startAsync().awaitRunning();
         service.stopAsync().awaitTerminated();
         try {
         service.notifyFailed(new Exception());
         fail();
         } catch (IllegalStateException expected) {}
         }
    
        private static class DefaultService extends AbstractService {
         @Override protected void doStart() {}
         @Override protected void doStop() {}
         }
    
        private static final Exception EXCEPTION = new Exception();
        }
    

    ServiceManagerTest

        /*
         * Copyright (C) 2012 The Guava Authors
         *
         * Licensed under the Apache License, Version 2.0 (the "License");
         * you may not use this file except in compliance with the License.
         * You may obtain a copy of the License at
         *
         * http://www.apache.org/licenses/LICENSE-2.0
         *
         * Unless required by applicable law or agreed to in writing, software
         * distributed under the License is distributed on an "AS IS" BASIS,
         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
         * See the License for the specific language governing permissions and
         * limitations under the License.
         */
        package com.google.common.util.concurrent;
    
        import static java.util.Arrays.asList;
    
        import com.google.common.collect.ImmutableMap;
        import com.google.common.collect.ImmutableSet;
        import com.google.common.collect.Lists;
        import com.google.common.collect.Sets;
        import com.google.common.testing.NullPointerTester;
        import com.google.common.testing.TestLogHandler;
        import com.google.common.util.concurrent.ServiceManager.Listener;
    
        import junit.framework.TestCase;
    
        import java.util.Arrays;
        import java.util.Collection;
        import java.util.List;
        import java.util.Set;
        import java.util.concurrent.CountDownLatch;
        import java.util.concurrent.Executor;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.TimeoutException;
        import java.util.logging.Formatter;
        import java.util.logging.Level;
        import java.util.logging.LogRecord;
        import java.util.logging.Logger;
    
        /**
         * Tests for {@link ServiceManager}.
         *
         * @author Luke Sandberg
         * @author Chris Nokleberg
         */
        public class ServiceManagerTest extends TestCase {
    
        private static class NoOpService extends AbstractService {
         @Override protected void doStart() {
         notifyStarted();
         }
    
        @Override protected void doStop() {
         notifyStopped();
         }
         }
    
        /*
         * A NoOp service that will delay the startup and shutdown notification for a configurable amount
         * of time.
         */
         private static class NoOpDelayedSerivce extends NoOpService {
         private long delay;
    
        public NoOpDelayedSerivce(long delay) {
         this.delay = delay;
         }
    
        @Override protected void doStart() {
         new Thread() {
         @Override public void run() {
         Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
         notifyStarted();
         }
         }.start();
         }
    
        @Override protected void doStop() {
         new Thread() {
         @Override public void run() {
         Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
         notifyStopped();
         }
         }.start();
         }
         }
    
        private static class FailStartService extends NoOpService {
         @Override protected void doStart() {
         notifyFailed(new IllegalStateException("failed"));
         }
         }
    
        private static class FailRunService extends NoOpService {
         @Override protected void doStart() {
         super.doStart();
         notifyFailed(new IllegalStateException("failed"));
         }
         }
    
        private static class FailStopService extends NoOpService {
         @Override protected void doStop() {
         notifyFailed(new IllegalStateException("failed"));
         }
         }
    
        public void testServiceStartupTimes() {
         Service a = new NoOpDelayedSerivce(150);
         Service b = new NoOpDelayedSerivce(353);
         ServiceManager serviceManager = new ServiceManager(asList(a, b));
         serviceManager.startAsync().awaitHealthy();
         ImmutableMap<Service, Long> startupTimes = serviceManager.startupTimes();
         assertEquals(2, startupTimes.size());
         assertTrue(startupTimes.get(a) >= 150);
         assertTrue(startupTimes.get(b) >= 353);
         }
    
        public void testServiceStartStop() {
         Service a = new NoOpService();
         Service b = new NoOpService();
         ServiceManager manager = new ServiceManager(asList(a, b));
         RecordingListener listener = new RecordingListener();
         manager.addListener(listener);
         assertState(manager, Service.State.NEW, a, b);
         assertFalse(manager.isHealthy());
         manager.startAsync().awaitHealthy();
         assertState(manager, Service.State.RUNNING, a, b);
         assertTrue(manager.isHealthy());
         assertTrue(listener.healthyCalled);
         assertFalse(listener.stoppedCalled);
         assertTrue(listener.failedServices.isEmpty());
         manager.stopAsync().awaitStopped();
         assertState(manager, Service.State.TERMINATED, a, b);
         assertFalse(manager.isHealthy());
         assertTrue(listener.stoppedCalled);
         assertTrue(listener.failedServices.isEmpty());
         }
    
        public void testFailStart() throws Exception {
         Service a = new NoOpService();
         Service b = new FailStartService();
         Service c = new NoOpService();
         Service d = new FailStartService();
         Service e = new NoOpService();
         ServiceManager manager = new ServiceManager(asList(a, b, c, d, e));
         RecordingListener listener = new RecordingListener();
         manager.addListener(listener);
         assertState(manager, Service.State.NEW, a, b, c, d, e);
         try {
         manager.startAsync().awaitHealthy();
         fail();
         } catch (IllegalStateException expected) {
         }
         assertFalse(listener.healthyCalled);
         assertState(manager, Service.State.RUNNING, a, c, e);
         assertEquals(ImmutableSet.of(b, d), listener.failedServices);
         assertState(manager, Service.State.FAILED, b, d);
         assertFalse(manager.isHealthy());
    
        manager.stopAsync().awaitStopped();
         assertFalse(manager.isHealthy());
         assertFalse(listener.healthyCalled);
         assertTrue(listener.stoppedCalled);
         }
    
        public void testFailRun() throws Exception {
         Service a = new NoOpService();
         Service b = new FailRunService();
         ServiceManager manager = new ServiceManager(asList(a, b));
         RecordingListener listener = new RecordingListener();
         manager.addListener(listener);
         assertState(manager, Service.State.NEW, a, b);
         try {
         manager.startAsync().awaitHealthy();
         fail();
         } catch (IllegalStateException expected) {
         }
         assertTrue(listener.healthyCalled);
         assertEquals(ImmutableSet.of(b), listener.failedServices);
    
        manager.stopAsync().awaitStopped();
         assertState(manager, Service.State.FAILED, b);
         assertState(manager, Service.State.TERMINATED, a);
    
        assertTrue(listener.stoppedCalled);
         }
    
        public void testFailStop() throws Exception {
         Service a = new NoOpService();
         Service b = new FailStopService();
         Service c = new NoOpService();
         ServiceManager manager = new ServiceManager(asList(a, b, c));
         RecordingListener listener = new RecordingListener();
         manager.addListener(listener);
    
        manager.startAsync().awaitHealthy();
         assertTrue(listener.healthyCalled);
         assertFalse(listener.stoppedCalled);
         manager.stopAsync().awaitStopped();
    
        assertTrue(listener.stoppedCalled);
         assertEquals(ImmutableSet.of(b), listener.failedServices);
         assertState(manager, Service.State.FAILED, b);
         assertState(manager, Service.State.TERMINATED, a, c);
         }
    
        public void testToString() throws Exception {
         Service a = new NoOpService();
         Service b = new FailStartService();
         ServiceManager manager = new ServiceManager(asList(a, b));
         String toString = manager.toString();
         assertTrue(toString.contains("NoOpService"));
         assertTrue(toString.contains("FailStartService"));
         }
    
        public void testTimeouts() throws Exception {
         Service a = new NoOpDelayedSerivce(50);
         ServiceManager manager = new ServiceManager(asList(a));
         manager.startAsync();
         try {
         manager.awaitHealthy(1, TimeUnit.MILLISECONDS);
         fail();
         } catch (TimeoutException expected) {
         }
         manager.awaitHealthy(100, TimeUnit.MILLISECONDS); // no exception thrown
    
        manager.stopAsync();
         try {
         manager.awaitStopped(1, TimeUnit.MILLISECONDS);
         fail();
         } catch (TimeoutException expected) {
         }
         manager.awaitStopped(100, TimeUnit.MILLISECONDS); // no exception thrown
         }
    
        /**
         * This covers a case where if the last service to stop failed then the stopped callback would
         * never be called.
         */
         public void testSingleFailedServiceCallsStopped() {
         Service a = new FailStartService();
         ServiceManager manager = new ServiceManager(asList(a));
         RecordingListener listener = new RecordingListener();
         manager.addListener(listener);
         try {
         manager.startAsync().awaitHealthy();
         fail();
         } catch (IllegalStateException expected) {
         }
         assertTrue(listener.stoppedCalled);
         }
    
        /**
         * This covers a bug where listener.healthy would get called when a single service failed during
         * startup (it occurred in more complicated cases also).
         */
         public void testFailStart_singleServiceCallsHealthy() {
         Service a = new FailStartService();
         ServiceManager manager = new ServiceManager(asList(a));
         RecordingListener listener = new RecordingListener();
         manager.addListener(listener);
         try {
         manager.startAsync().awaitHealthy();
         fail();
         } catch (IllegalStateException expected) {
         }
         assertFalse(listener.healthyCalled);
         }
    
        /**
         * This covers a bug where if a listener was installed that would stop the manager if any service
         * fails and something failed during startup before service.start was called on all the services,
         * then awaitStopped would deadlock due to an IllegalStateException that was thrown when trying to
         * stop the timer(!).
         */
         public void testFailStart_stopOthers() throws TimeoutException {
         Service a = new FailStartService();
         Service b = new NoOpService();
         final ServiceManager manager = new ServiceManager(asList(a, b));
         manager.addListener(new Listener() {
         @Override public void failure(Service service) {
         manager.stopAsync();
         }});
         manager.startAsync();
         manager.awaitStopped(10, TimeUnit.MILLISECONDS);
         }
    
        private static void assertState(
         ServiceManager manager, Service.State state, Service... services) {
         Collection<Service> managerServices = manager.servicesByState().get(state);
         for (Service service : services) {
         assertEquals(service.toString(), state, service.state());
         assertEquals(service.toString(), service.isRunning(), state == Service.State.RUNNING);
         assertTrue(managerServices + " should contain " + service, managerServices.contains(service));
         }
         }
    
        /**
         * This is for covering a case where the ServiceManager would behave strangely if constructed
         * with no service under management. Listeners would never fire because the ServiceManager was
         * healthy and stopped at the same time. This test ensures that listeners fire and isHealthy
         * makes sense.
         */
         public void testEmptyServiceManager() {
         Logger logger = Logger.getLogger(ServiceManager.class.getName());
         logger.setLevel(Level.FINEST);
         TestLogHandler logHandler = new TestLogHandler();
         logger.addHandler(logHandler);
         ServiceManager manager = new ServiceManager(Arrays.<Service>asList());
         RecordingListener listener = new RecordingListener();
         manager.addListener(listener, MoreExecutors.sameThreadExecutor());
         manager.startAsync().awaitHealthy();
         assertTrue(manager.isHealthy());
         assertTrue(listener.healthyCalled);
         assertFalse(listener.stoppedCalled);
         assertTrue(listener.failedServices.isEmpty());
         manager.stopAsync().awaitStopped();
         assertFalse(manager.isHealthy());
         assertTrue(listener.stoppedCalled);
         assertTrue(listener.failedServices.isEmpty());
         // check that our NoOpService is not directly observable via any of the inspection methods or
         // via logging.
         assertEquals("ServiceManager{services=[]}", manager.toString());
         assertTrue(manager.servicesByState().isEmpty());
         assertTrue(manager.startupTimes().isEmpty());
         Formatter logFormatter = new Formatter() {
         @Override public String format(LogRecord record) {
         return formatMessage(record);
         }
         };
         for (LogRecord record : logHandler.getStoredLogRecords()) {
         assertFalse(logFormatter.format(record).contains("NoOpService"));
         }
         }
    
        /**
         * This is for a case where a long running Listener using the sameThreadListener could deadlock
         * another thread calling stopAsync().
         */
    
        public void testListenerDeadlock() throws InterruptedException {
         final CountDownLatch failEnter = new CountDownLatch(1);
         Service failRunService = new AbstractService() {
         @Override protected void doStart() {
         new Thread() {
         @Override public void run() {
         notifyStarted();
         notifyFailed(new Exception("boom"));
         }
         }.start();
         }
         @Override protected void doStop() {
         notifyStopped();
         }
         };
         final ServiceManager manager = new ServiceManager(
         Arrays.asList(failRunService, new NoOpService()));
         manager.addListener(new ServiceManager.Listener() {
         @Override public void failure(Service service) {
         failEnter.countDown();
         // block forever!
         Uninterruptibles.awaitUninterruptibly(new CountDownLatch(1));
         }
         }, MoreExecutors.sameThreadExecutor());
         // We do not call awaitHealthy because, due to races, that method may throw an exception. But
         // we really just want to wait for the thread to be in the failure callback so we wait for that
         // explicitly instead.
         manager.startAsync();
         failEnter.await();
         assertFalse("State should be updated before calling listeners", manager.isHealthy());
         // now we want to stop the services.
         Thread stoppingThread = new Thread() {
         @Override public void run() {
         manager.stopAsync().awaitStopped();
         }
         };
         stoppingThread.start();
         // this should be super fast since the only non stopped service is a NoOpService
         stoppingThread.join(1000);
         assertFalse("stopAsync has deadlocked!.", stoppingThread.isAlive());
         }
    
        /**
         * Catches a bug where when constructing a service manager failed, later interactions with the
         * service could cause IllegalStateExceptions inside the partially constructed ServiceManager.
         * This ISE wouldn't actually bubble up but would get logged by ExecutionQueue. This obfuscated
         * the original error (which was not constructing ServiceManager correctly).
         */
         public void testPartiallyConstructedManager() {
         Logger logger = Logger.getLogger("global");
         logger.setLevel(Level.FINEST);
         TestLogHandler logHandler = new TestLogHandler();
         logger.addHandler(logHandler);
         NoOpService service = new NoOpService();
         service.startAsync();
         try {
         new ServiceManager(Arrays.asList(service));
         fail();
         } catch (IllegalArgumentException expected) {}
         service.stopAsync();
         // Nothing was logged!
         assertEquals(0, logHandler.getStoredLogRecords().size());
         }
    
        public void testPartiallyConstructedManager_transitionAfterAddListenerBeforeStateIsReady() {
         // The implementation of this test is pretty sensitive to the implementation <img src="http://ifeve.com/wp-includes/images/smilies/frownie.png" alt=":(" class="wp-smiley" style="height: 1em; max-height: 1em;"> but we want to
         // ensure that if weird things happen during construction then we get exceptions.
         final NoOpService service1 = new NoOpService();
         // This service will start service1 when addListener is called. This simulates service1 being
         // started asynchronously.
         Service service2 = new Service() {
         final NoOpService delegate = new NoOpService();
         @Override public final void addListener(Listener listener, Executor executor) {
         service1.startAsync();
         delegate.addListener(listener, executor);
         }
         // Delegates from here on down
         @Override public final Service startAsync() {
         return delegate.startAsync();
         }
    
        @Override public final Service stopAsync() {
         return delegate.stopAsync();
         }
    
        @Override public final ListenableFuture<State> start() {
         return delegate.start();
         }
    
        @Override public final ListenableFuture<State> stop() {
         return delegate.stop();
         }
    
        @Override public State startAndWait() {
         return delegate.startAndWait();
         }
    
        @Override public State stopAndWait() {
         return delegate.stopAndWait();
         }
    
        @Override public final void awaitRunning() {
         delegate.awaitRunning();
         }
    
        @Override public final void awaitRunning(long timeout, TimeUnit unit)
         throws TimeoutException {
         delegate.awaitRunning(timeout, unit);
         }
    
        @Override public final void awaitTerminated() {
         delegate.awaitTerminated();
         }
    
        @Override public final void awaitTerminated(long timeout, TimeUnit unit)
         throws TimeoutException {
         delegate.awaitTerminated(timeout, unit);
         }
    
        @Override public final boolean isRunning() {
         return delegate.isRunning();
         }
    
        @Override public final State state() {
         return delegate.state();
         }
    
        @Override public final Throwable failureCause() {
         return delegate.failureCause();
         }
         };
         try {
         new ServiceManager(Arrays.asList(service1, service2));
         fail();
         } catch (IllegalArgumentException expected) {
         assertTrue(expected.getMessage().contains("started transitioning asynchronously"));
         }
         }
    
        /**
         * This test is for a case where two Service.Listener callbacks for the same service would call
         * transitionService in the wrong order due to a race. Due to the fact that it is a race this
         * test isn't guaranteed to expose the issue, but it is at least likely to become flaky if the
         * race sneaks back in, and in this case flaky means something is definitely wrong.
         *
         * <p>Before the bug was fixed this test would fail at least 30% of the time.
         */
    
        public void testTransitionRace() throws TimeoutException {
         for (int k = 0; k < 1000; k++) {
         List<Service> services = Lists.newArrayList();
         for (int i = 0; i < 5; i++) {
         services.add(new SnappyShutdownService(i));
         }
         ServiceManager manager = new ServiceManager(services);
         manager.startAsync().awaitHealthy();
         manager.stopAsync().awaitStopped(1, TimeUnit.SECONDS);
         }
         }
    
        /**
         * This service will shutdown very quickly after stopAsync is called and uses a background thread
         * so that we know that the stopping() listeners will execute on a different thread than the
         * terminated() listeners.
         */
         private static class SnappyShutdownService extends AbstractExecutionThreadService {
         final int index;
         final CountDownLatch latch = new CountDownLatch(1);
    
        SnappyShutdownService(int index) {
         this.index = index;
         }
    
        @Override protected void run() throws Exception {
         latch.await();
         }
    
        @Override protected void triggerShutdown() {
         latch.countDown();
         }
    
        @Override protected String serviceName() {
         return this.getClass().getSimpleName() + "[" + index + "]";
         }
         }
    
        public void testNulls() {
         ServiceManager manager = new ServiceManager(Arrays.<Service>asList());
         new NullPointerTester()
         .setDefault(ServiceManager.Listener.class, new RecordingListener())
         .testAllPublicInstanceMethods(manager);
         }
    
        private static final class RecordingListener extends ServiceManager.Listener {
         volatile boolean healthyCalled;
         volatile boolean stoppedCalled;
         final Set<Service> failedServices = Sets.newConcurrentHashSet();
    
        @Override public void healthy() {
         healthyCalled = true;
         }
    
        @Override public void stopped() {
         stoppedCalled = true;
         }
    
        @Override public void failure(Service service) {
         failedServices.add(service);
         }
         }
        }
    

    相关文章

      网友评论

          本文标题:Google-Guava Concurrent 包里的 Serv

          本文链接:https://www.haomeiwen.com/subject/skiffctx.html