Skip to content

spring cloud feign重试源码分析

Posted on:September 5, 2023 at 09:13 PM

在spring cloud中feign是没有重试机制的,但是其复用Ribbon重试逻辑

RetryTemplate

  1. 重试逻辑一般使用重试模版org.springframework.retry.support.RetryTemplate实现
  2. 其核心逻辑在org.springframework.retry.support.RetryTemplate#doExecute方法中

重试逻辑

//循环判断是否可重试和流程无中断
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

	try {
		if (this.logger.isDebugEnabled()) {
			this.logger.debug("Retry: count=" + context.getRetryCount());
		}
		// Reset the last exception, so if we are successful
		// the close interceptors will not think we failed...
		lastException = null;
		//执行业务逻辑
		return retryCallback.doWithRetry(context);
	}
	catch (Throwable e) {

		lastException = e;

		try {
			//异常处理,此处会增加重试次数
			registerThrowable(retryPolicy, state, context, e);
		}
		catch (Exception ex) {
			throw new TerminatedRetryException("Could not register throwable",
					ex);
		}
		finally {
			doOnErrorInterceptors(retryCallback, context, e);
		}

		if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
			try {
				backOffPolicy.backOff(backOffContext);
			}
			catch (BackOffInterruptedException ex) {
				lastException = e;
				// back off was prevented by another thread - fail the retry
				if (this.logger.isDebugEnabled()) {
					this.logger
							.debug("Abort retry because interrupted: count="
									+ context.getRetryCount());
				}
				throw ex;
			}
		}

		if (this.logger.isDebugEnabled()) {
			this.logger.debug(
					"Checking for rethrow: count=" + context.getRetryCount());
		}

		if (shouldRethrow(retryPolicy, context, state)) {
			if (this.logger.isDebugEnabled()) {
				this.logger.debug("Rethrow in retry for policy: count="
						+ context.getRetryCount());
			}
			throw RetryTemplate.<E>wrapIfNecessary(e);
		}

	}

	/*
	 * A stateful attempt that can retry may rethrow the exception before now,
	 * but if we get this far in a stateful retry there's a reason for it,
	 * like a circuit breaker or a rollback classifier.
	 */
	if (state != null && context.hasAttribute(GLOBAL_STATE)) {
		break;
	}
}

其中canRetry会委托给org.springframework.retry.RetryPolicy判断

protected boolean canRetry(RetryPolicy retryPolicy, RetryContext context) {
	return retryPolicy.canRetry(context);
}

FeignRetryPolicy

而在Feign中是由org.springframework.cloud.netflix.feign.ribbon.FeignRetryPolicy实现了org.springframework.retry.RetryPolicy

@Override
public boolean canRetry(RetryContext context) {
	if(context.getRetryCount() == 0) {
		return true;
	}
	return super.canRetry(context);
}

其第一次直接返回可重试,第二次其才委托给父类org.springframework.cloud.client.loadbalancer.InterceptorRetryPolicy

InterceptorRetryPolicy

@Override
public boolean canRetry(RetryContext context) {
    LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext)context;
    if(lbContext.getRetryCount() == 0  && lbContext.getServiceInstance() == null) {
        //We haven't even tried to make the request yet so return true so we do
        lbContext.setServiceInstance(serviceInstanceChooser.choose(serviceName));
        return true;
    }
    return policy.canRetryNextServer(lbContext);
}

此处会委托给org.springframework.cloud.netflix.ribbon.RibbonLoadBalancedRetryPolicy

RibbonLoadBalancedRetryPolicy

核心方法canRetryNextServer()

其中简单判断当前nextServerCount是否大于和等于getMaxRetriesOnNextServer()和请求是否可以重试

@Override
public boolean canRetryNextServer(LoadBalancedRetryContext context) {
	//this will be called after a failure occurs and we increment the counter
	//so we check that the count is less than or equals to too make sure
	//we try the next server the right number of times
	return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer() && canRetry(context);
}

其中getMaxRetriesOnNextServer()ribbon.MaxAutoRetriesNextServer配置,默认为1

方法org.springframework.cloud.netflix.ribbon.RibbonLoadBalancedRetryPolicy#canRetry()

public boolean canRetry(LoadBalancedRetryContext context) {
	HttpMethod method = context.getRequest().getMethod();
	return HttpMethod.GET == method || lbContext.isOkToRetryOnAllOperations();
}

此方法仅仅简单判断请求是否GET方式和是否配置ribbon.OkToRetryOnAllOperations,默认为false

异常重试

通过以上代码我们发现:如果异常情况下,仿佛while循环中的条件一直为true,无法结束。其实有个重要逻辑在registerThrowable()方法中实现

registerThrowable()

异常会进入此方法

protected void registerThrowable(RetryPolicy retryPolicy, RetryState state,
		RetryContext context, Throwable e) {
	retryPolicy.registerThrowable(context, e);
	registerContext(context, state);
}

其委托给InterceptorRetryPolicy处理

LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
//this is important as it registers the last exception in the context and also increases the retry count
lbContext.registerThrowable(throwable);
//let the policy know about the exception as well
policy.registerThrowable(lbContext, throwable);

其中2行代码都很重要:

第一行

第一行代码会执行RetryContextSupport中的registerThrowable()

public void registerThrowable(Throwable throwable) {
	this.lastException = throwable;
	if (throwable != null)
		count++;
}

其中简单的增加count;是FeignRetryPolicy#canRetry()第一次判断逻辑

if(context.getRetryCount() == 0) {
	return true;
}

第二行

第二行会执行RibbonLoadBalancedRetryPolicy中的registerThrowable()

@Override
public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) {
	//if this is a circuit tripping exception then notify the load balancer
	if (lbContext.getRetryHandler().isCircuitTrippingException(throwable)) {
		updateServerInstanceStats(context);
	}
	
	//Check if we need to ask the load balancer for a new server.
	//Do this before we increment the counters because the first call to this method
	//is not a retry it is just an initial failure.
	if(!canRetrySameServer(context)  && canRetryNextServer(context)) {
		context.setServiceInstance(loadBalanceChooser.choose(serviceId));
	}
	//This method is called regardless of whether we are retrying or making the first request.
	//Since we do not count the initial request in the retry count we don't reset the counter
	//until we actually equal the same server count limit.  This will allow us to make the initial
	//request plus the right number of retries.
	if(sameServerCount >= lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context)) {
		//reset same server since we are moving to a new server
		sameServerCount = 0;
		nextServerCount++;
		if(!canRetryNextServer(context)) {
			context.setExhaustedOnly();
		}
	} else {
		sameServerCount++;
	}
}

最终会变成处理nextServerCount属性;第二次FeignRetryPolicy#canRetry()使用此逻辑