作者:京東物流 張小龍
本文通過介紹分布式應用下各個場景的全局日志ID透傳思路,以及介紹分布式日志追蹤ID簡單實現原理和實戰效果,從而達到通過提高日志查詢排查問題的效率。
背景
開發排查系統問題用得最多的手段就是查看系統日志,相信不少人都值過班當過小秘吧:給下接口和出入參吧,麻煩看看日志里的有沒有異常信息啊等等,但是在并發大時使用日志定位問題還是比較麻煩,由于大量的其他用戶/其他線程的日志也一起輸出穿行其中導致很難篩選出指定請求的全部相關日志,以及下游線程/服務對應的日志,甚至一些特殊場景的出入參只打印了一些諸如gis坐標、四級地址等沒有單據信息的日志,使得日志定位起來非常不便
場景分析
自己所在組負責的系統主要是web應用,其中涉及到的請求方式主要有:springmvc的servlet的http場景、jsf場景、MQ場景、resteasy場景、clover場景、easyjob場景,每一種場景都需要不同的方式進行logTraceId的透傳,接下來逐個探析上述各個場景的透傳方案。
在這之前我們先要簡單了解一下日志中透傳和打印logTraceId的方式,一般我們使用MDC進行logTraceId的透傳與打印,但是基于MDC內部使用的是ThreadLocal所以只有本線程才有效,子線程服務的MDC里的值會丟失,所以這里我們要么是在所有涉及到父子線程的地方以編碼侵入式自行實現值的傳遞,要么就是通過覆寫MDCAdapter:通過阿里的TransmittableThreadLocal來解決父子線程傳遞問題,而本文采用的是比較粗糙地以編碼侵入式來解決此問題。
springmvc的servlet的http場景
這個場景相信大家都已經爛熟到骨子里了,主要思路是通過攔截器的方式進行logTraceId的透傳,新建一個類實現HandlerInterceptor
preHandle:在業務處理器處理請求之前被調用,這里實現logTraceId的設置與透傳
postHandle:在業務處理器處理請求執行完成后,生成視圖之前執行,這里空實現就好
afterCompletion:在DispatcherServlet完全處理完請求后被調用,這里用于清除MDC的logTraceId
@Slf4j
public class TraceInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) throws Exception {
try{
String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
if (StringUtils.isBlank(traceId)) {
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, TraceUtils.getTraceId());
}
}catch (RuntimeException e){
log.error("mvc自定義log跟蹤攔截器執行異常",e);
}
return true;
}
@Override
public void postHandle(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
try{
MDC.clear();
}catch (RuntimeException ex){
log.error("mvc自定義log跟蹤攔截器執行異常",ex);
}
}
}
jsf場景
相信大家對于jsf并不陌生,而jsf也支持自定義filter,基于jsf過濾器的運行方式,可以通過配置全局過濾器(繼承AbstractFilter)的方式進行logTraceId的透傳,需要注意的是jsf是在線程池中執行的所以一定要信任消息體中的logTraceId
jsf消費者過濾器:主要從上下文環境中獲取logTraceId并進行透傳,實現代碼如下
@Slf4j
public class TraceIdGlobalJsfFilter extends AbstractFilter {
@Override
public ResponseMessage invoke(RequestMessage requestMessage) {
//設置traceId
setAndGetTraceId(requestMessage);
try{
return this.getNext().invoke(requestMessage);
}finally {
}
}
/**
* 設置并返回traceId
* @param requestMessage
* @return
*/
private void setAndGetTraceId(RequestMessage requestMessage) {
try{
String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
//如果filter和MDC都沒有獲取到則說明有遺漏,打印日志
if(log.isDebugEnabled()){
log.debug("jsf消費者自定義log跟蹤攔截器預警,filter和MDC都沒有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
}
} else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
//如果MDC沒有,filter有,打印日志
if(log.isDebugEnabled()){
log.debug("jsf消費者自定義log跟蹤攔截器預警,MDC沒有filter有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
}
} else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
//如果MDC有,filter沒有,說明是源頭已經有了,但是jsf是第一次調,透傳
requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, logTraceId);
}else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null){
//MDC和fitler都有,但是并不相等,則存在問題打印日志
if(log.isDebugEnabled()){
log.debug("jsf消費者自定義log跟蹤攔截器預警,MDC和filter都有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
}
}
}catch (RuntimeException e){
log.error("jsf消費者自定義log跟蹤攔截器執行異常",e);
}
}
}
jsf提供者過濾器:通過拿到消費者在消息體中透傳的logTraceId來實現,實現代碼如下
@Slf4j
public class TraceIdGlobalJsfProducerFilter extends AbstractFilter {
@Override
public ResponseMessage invoke(RequestMessage requestMessage) {
//設置traceId
boolean isNeedClearMdc = transferTraceId(requestMessage);
try{
return this.getNext().invoke(requestMessage);
}finally {
if(isNeedClearMdc){
clear();
}
}
}
/**
* 設置并返回traceId
* @param requestMessage
* @return
*/
private boolean transferTraceId(RequestMessage requestMessage) {
boolean isNeedClearMdc = false;
try{
String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
//如果filter和MDC都沒有獲取到,說明存在遺漏場景或是提供給外部系統調用的接口,打印日志進行觀察
String traceId = TraceUtils.getTraceId();
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,traceId);
requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, traceId);
if(log.isDebugEnabled()){
log.debug("jsf生產者自定義log跟蹤攔截器預警,filter和MDC都沒有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
}
isNeedClearMdc = true;
} else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
//如果MDC沒有,filter有,說明是被調用方,需要透傳下去
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceIdObj.toString());
isNeedClearMdc = true;
} else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
//如果MDC有,filter沒有,存在問題,打印日志
if(log.isDebugEnabled()){
log.debug("jsf生產者自定義log跟蹤攔截器預警,MDC有filter沒有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
}
isNeedClearMdc = true;
}else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null && !logTraceId.equals(logTraceIdObj.toString())){
//MDC和fitler都有,但是并不相等,則信任filter透傳結果
TraceUtils.resetTraceId(logTraceIdObj.toString());
if(log.isDebugEnabled()){
log.debug("jsf生產者自定義log跟蹤攔截器預警,MDC和fitler都有traceId,但是并不相等,jsf信息:{}", JSON.toJSONString(requestMessage));
}
}
return isNeedClearMdc;
}catch (RuntimeException e){
log.error("jsf生產者自定義log跟蹤攔截器執行異常",e);
return false;
}
}
/**
* 清除MDC
*/
private void clear() {
try{
MDC.clear();
}catch (RuntimeException e){
log.error("jsf生產者自定義log跟蹤攔截器執行異常",e);
}
}
}
MQ場景
說到MQ相信大家對于此就更不陌生了,此種場景主要通過在提供者發送消息時拿到上下文中的logTraceId,將其以擴展信息的方式設置進消息體中進行透傳,而消費者則從消息體中進行獲取
生產者:新建一個抽象類繼承MessageProducer,覆寫父類中的兩個send方法(批量發送、單條發送),send方法中主要調用抽象加工消息體的方法(logTraceId屬性賦值)和日志打印,在子類中進行發送前對消息體的加工處理,具體代碼如下
@Slf4j
public abstract class BaseTraceIdProducer extends MessageProducer {
private static final String SEPARATOR_COMMA = ",";
public BaseTraceIdProducer() {
}
public BaseTraceIdProducer(TransportManager transportManager) {
super(transportManager);
}
/**
* 獲取消息體-單個
* @param messageContext
* @return
*/
protected abstract Message getMessage(MessageContext messageContext);
/** 獲取消息體-批量
*
* @param messageContext
* @return
*/
protected abstract List getMessages(MessageContext messageContext);
/**
* 填充消息體上下文信息
* @param message
* @param messageContext
*/
protected void fillContext(Message message,MessageContext messageContext) {
if(message == null){
return;
}
if(StringUtils.isBlank(messageContext.getLogTraceId())){
String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
messageContext.setLogTraceId(logTraceId);
}
if(StringUtils.isBlank(messageContext.getTopic())){
String topic = message.getTopic();
messageContext.setTopic(topic);
}
String businessId = message.getBusinessId();
messageContext.getBusinessIdBuf().append(SEPARATOR_COMMA).append(businessId);
}
/**
* traceId嵌入消息體中
* @param message
*/
protected void generateTraceIdIntoMessage(Message message){
if(message == null){
return;
}
try{
String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
if(StringUtils.isBlank(logTraceId)){
logTraceId = TraceUtils.getTraceId();
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
}
message.setAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY,logTraceId);
}catch (RuntimeException e){
log.error("jmq2自定義log跟蹤攔截器執行異常",e);
}
}
/**
* 批量發送消息-無回調
* @param messages
* @param timeout
* @throws JMQException
*/
public void send(List messages, int timeout) throws JMQException {
MessageContext messageContext = new MessageContext();
messageContext.setMessages(messages);
List messageList = this.getMessages(messageContext);
//打印日志,方便排查問題
printLog(messageContext);
super.send(messageList, timeout);
}
/**
* 單個發送消息
* @param message
* @param transaction
* @param
* @return
* @throws JMQException
*/
public T send(Message message, LocalTransaction transaction) throws JMQException {
MessageContext messageContext = new MessageContext();
messageContext.setMessage(message);
Message msg = this.getMessage(messageContext);
//打印日志,方便排查問題
printLog(messageContext);
return super.send(msg, transaction);
}
/**
* 批量發送消息-有回調
* @param messages
* @param timeout
* @param callback
* @throws JMQException
*/
public void send(List messages, int timeout, AsyncSendCallback callback) throws JMQException {
MessageContext messageContext = new MessageContext();
messageContext.setMessages(messages);
List messageList = this.getMessages(messageContext);
//打印日志,方便排查問題
printLog(messageContext);
super.send(messageList, timeout, callback);
}
/**
* 打印日志,方便排查問題
* @param messageContext
*/
private void printLog(MessageContext messageContext) {
if(messageContext==null){
return;
}
if(log.isInfoEnabled()){
log.info("MQ發送:traceId:{},topic:{},businessIds:[{}]",messageContext.getLogTraceId(),messageContext.getTopic(),messageContext.getBusinessIdBuf()==null?"":messageContext.getBusinessIdBuf().toString());
}
}
}
@Slf4j
public class TraceIdEnvMessageProducer extends BaseTraceIdProducer {
private static final String UAT_TRUE = String.valueOf(true);
private boolean uat = false;
public TraceIdEnvMessageProducer() {
}
public TraceIdEnvMessageProducer(TransportManager transportManager) {
super(transportManager);
}
/**
* 環境變量打標-單個消息體
* @param message
*/
private void convertUatMessage(Message message) {
if (message != null) {
message.setAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT, UAT_TRUE);
}
}
/**
* 消息轉換-批量消息體
* @param messageContext
* @return
*/
private List convertMessages(MessageContext messageContext) {
List messages = messageContext.getMessages();
if (!CollectionUtils.isEmpty(messages)) {
Iterator messageIterator = messages.iterator();
while(messageIterator.hasNext()) {
Message message = (Message)messageIterator.next();
if(this.isUat()){
this.convertUatMessage(message);
}
super.generateTraceIdIntoMessage(message);
super.fillContext(message,messageContext);
}
}
return messageContext.getMessages();
}
/**
* 消息轉換-單個消息體
* @param messageContext
* @return
*/
private Message convertMessage(MessageContext messageContext){
Message message = messageContext.getMessage();
if(this.isUat()){
this.convertUatMessage(message);
}
super.generateTraceIdIntoMessage(message);
super.fillContext(message,messageContext);
return message;
}
protected Message getMessage(MessageContext messageContext) {
if(log.isDebugEnabled()){
log.debug("current environment is UAT : {}", this.isUat());
}
return this.convertMessage(messageContext);
}
protected List getMessages(MessageContext messageContext) {
if(log.isDebugEnabled()){
log.debug("current environment is UAT : {}", this.isUat());
}
return this.convertMessages(messageContext);
}
public void setUat(boolean uat) {
this.uat = uat;
}
boolean isUat() {
return this.uat;
}
}
消費者:新建一個抽象類繼承MessageListener,覆寫父類中的onMessage方法,主要進行設置日志traceId和消費完成后的traceId清理等,而在子類中進行一些自定義處理,具體代碼如下
@Slf4j
public abstract class BaseTraceIdMessageListener implements MessageListener {
public BaseTraceIdMessageListener() {
}
public abstract void onMessageList(List messages) throws Exception;
@Override
public final void onMessage(List messages) throws Exception {
try{
if(CollectionUtils.isEmpty(messages)){
return;
}
//設置日志traceId
setLogTraceId(messages);
this.onMessageList(messages);
//消費完后清除traceId
clear();
}catch (Exception e){
throw e;
}finally {
MDC.clear();
}
}
/**
* 設置日志traceId
* @param messages
*/
private void setLogTraceId(List messages) {
try{
Message message = messages.get(0);
String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
if(StringUtils.isBlank(logTraceId)){
logTraceId = TraceUtils.getTraceId();
}
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
}catch (RuntimeException e){
log.error("jmq2自定義log跟蹤攔截器執行異常",e);
}
}
/**
* 清除traceId
*/
private void clear() {
try{
MDC.clear();
}catch (RuntimeException e){
log.error("jmq2自定義log跟蹤攔截器執行異常",e);
}
}
}
@Slf4j
public abstract class TraceIdEnvMessageListener extends BaseTraceIdMessageListener{
private String uat;
public TraceIdEnvMessageListener() {
}
public abstract void onMessages(List var1) throws Exception;
@Override
public void onMessageList(List messages) throws Exception {
Iterator iterator;
Message message;
if (this.getUat() != null && Boolean.valueOf(this.getUat())) {
iterator = messages.iterator();
while(true) {
while(iterator.hasNext()) {
message = (Message)iterator.next();
if (message != null && Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
this.onMessages(Arrays.asList(message));
} else {
log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
}
}
return;
}
} else if (this.getUat() != null && !Boolean.valueOf(this.getUat())) {
iterator = messages.iterator();
while(true) {
while(iterator.hasNext()) {
message = (Message)iterator.next();
if (message != null && !Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
this.onMessages(Arrays.asList(message));
} else {
log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
}
}
return;
}
} else {
this.onMessages(messages);
}
}
public void setUat(String uat) {
if (!"true".equals(uat) && !"false".equals(uat)) {
throw new IllegalArgumentException("uat 屬性值只能為 true 或 false.");
} else {
this.uat = uat;
}
}
public String getUat() {
return this.uat;
}
}
resteasy場景
此場景類似于spinrg-mvc場景,也是http請求,需要通過攔截器在消息頭中進行logTraceId的透傳,主要有客戶端攔截器,服務端:預處理攔截器、后置攔截器,代碼如下
@ClientInterceptor
@Provider
@Slf4j
public class ResteasyClientInterceptor implements ClientExecutionInterceptor {
@Override
public ClientResponse execute(ClientExecutionContext clientExecutionContext) throws Exception {
try{
String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
ClientRequest request = clientExecutionContext.getRequest();
String headerTraceId = request.getHeaders().getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
if(StringUtils.isBlank(logTraceId) && StringUtils.isBlank(headerTraceId)){
//如果filter和MDC都沒有獲取到則說明是調用源頭
String traceId = TraceUtils.getTraceId();
TraceUtils.resetTraceId(traceId);
request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,traceId);
} else if(StringUtils.isBlank(headerTraceId)){
//如果MDC有但是filter沒有則需要傳遞
request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,logTraceId);
}
}catch (RuntimeException e){
log.error("resteasy客戶端log跟蹤攔截器執行異常",e);
}
return clientExecutionContext.proceed();
}
}
@Slf4j
@Provider
@ServerInterceptor
public class RestEasyPreInterceptor implements PreProcessInterceptor {
@Override
public ServerResponse preProcess(HttpRequest request, ResourceMethod resourceMethod) throws Failure, WebApplicationException {
try{
MultivaluedMap requestHeaders = request.getHttpHeaders().getRequestHeaders();
String headerTraceId = requestHeaders.getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
if(StringUtils.isNotBlank(headerTraceId)){
//如果filter則透傳
TraceUtils.resetTraceId(headerTraceId);
}
}catch (RuntimeException e){
log.error("resteasy服務端log跟蹤前置攔截器執行異常",e);
}
return null;
}
}
@Slf4j
@Provider
@ServerInterceptor
public class ResteasyPostInterceptor implements PostProcessInterceptor {
@Override
public void postProcess(ServerResponse serverResponse) {
try{
MDC.clear();
}catch (RuntimeException e){
log.error("resteasy服務端log跟蹤后置攔截器執行異常",e);
}
}
}
clover場景
clover的大體機制主要是在項目啟動的時候掃描到帶有注解@HessianWebService的類進行服務注冊并維持心跳檢測,而clover端則通過servlet請求方式進行任務的回調,同時繼承AbstractScheduleTaskProcess方式的任務是以線程池的方式進行業務的處理
基于上述原理我們需要解決兩個問題:1.新建一個類繼承ServiceExporterServlet,并在web.xml配置中進行servlet配置,代碼如下;
@Slf4j
public class ServiceExporterTraceIdServlet extends ServiceExporterServlet {
@Override
public void service(ServletRequest req, ServletResponse res) throws ServletException, IOException {
try {
String traceId = MDC.get("traceId");
if (StringUtils.isBlank(traceId)) {
MDC.put("traceId", TraceUtils.getTraceId());
}
} catch (Exception e) {
log.error("clover請求servlet執行異常", e);
}
try {
super.service(req, res);
} catch (Throwable e) {
log.error("clover請求servlet執行異常", e);
throw e;
}finally {
try{
MDC.clear();
}catch (RuntimeException ex){
log.error("clover請求servlet執行異常",ex);
}
}
}
}
2.新建一個抽象類繼承AbstractScheduleTaskProcess,在類中以編碼形式進行父子線程的透傳(可優化:通過覆寫MDCAdapter:通過阿里的TransmittableThreadLocal來解決父子線程傳遞問題),所有任務均改為繼承此類,關鍵代碼如下
try{
traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
if (StringUtils.isBlank(traceId)) {
log.warn("clover自定義log跟蹤攔截器預警,mdc沒有traceId");
}
}catch (RuntimeException e){
log.error("clover自定義log跟蹤攔截器執行異常",e);
}
final String logTraceId = traceId;
while(iterator.hasNext()) {
final List list = (List)iterator.next();
this.executor.submit(new Callable() {
public Object call() throws Exception {
try{
if (StringUtils.isNotBlank(logTraceId)) {
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, logTraceId);
}
}catch (RuntimeException e){
log.error("clover自定義log跟蹤攔截器執行異常",e);
}
Object var1;
try {
if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
BaseTcTaskProcessWorker.logger.info("正在執行任務[" + this.getClass().getName() + "],條數:" + list.size() + "...");
}
BaseTcTaskProcessWorker.this.executeTasks(list);
if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
BaseTcTaskProcessWorker.logger.info("執行任務[" + this.getClass().getName() + "],條數:" + list.size() + "成功!");
}
var1 = null;
} catch (Exception var5) {
BaseTcTaskProcessWorker.logger.error(var5.getMessage(), var5);
throw var5;
} finally {
try{
MDC.clear();
}catch (RuntimeException ex){
log.error("clover自定義log跟蹤攔截器執行異常",ex);
}
latch.countDown();
}
return var1;
}
});
}
easyjob場景
easyjob的大體機制是在項目啟動的時候通過掃描實現接口Scheduler的類進行上報注冊,同時啟動一個acceptor(獲取任務的線程池),而acceptor拉取到任務后會將父任務放進一個叫executor的線程池,子任務范進一個叫slowExecutor的線程池,我們可以新建一個抽獎類實現接口ScheduleFlowTask,復用clover場景硬編碼方式進行父子線程logTraceId的透傳處理(可優化:通過覆寫MDCAdapter:通過阿里的TransmittableThreadLocal來解決父子線程傳遞問題),示例代碼如下
?
@Slf4j
public abstract class AbstractEasyjobOnlyScheduleProcess implements ScheduleFlowTask {
/**
* EASYJOB平臺UMP監控key前綴
*/
private static final String EASYJOB_UMP_KEY_RREFIX = "trans.easyjob.dotask.";
/**
* EASYJOB單個任務處理分布式鎖前綴
*/
private static final String EASYJOB_SINGLE_TASK_LOCK_PREFIX = "basic_easyjob_single_task_lock_prefix_";
/**
* 環境標識-開關配置進行環境隔離
*/
@Value("${spring.profiles.active}")
private String activeEnv;
@Value("${task.scene.mark}")
private String sceneMark = TaskSceneMarkEnum.PRODUCTION.getDesc();
/**
* easyJob維度線程池變量
*/
private ThreadPoolExecutor easyJobExecutor;
/**
* easyJob維度服務器個數-分片個數
*/
private volatile int easyJobLastThreadCount = 0;
/**
* easyjob多線程名稱
*/
private static final String EASYJOB_THREAD_NAME = "dts.easyJobs";
/**
* 子類的泛型參數類型
*/
private Class argumentType;
/**
* 無參構造
*/
public AbstractEasyjobOnlyScheduleProcess() {
//設置子類泛型參數類型
argumentType = this.getArgumentType();
}
@Autowired
private RedisHelper redisHelper;
/**
* 非task表掃描待處理的任務數據
* @param taskServerParam
* @param curServer
* @return
*/
protected abstract List loadTasks(TaskServerParam taskServerParam, int curServer);
/**
* 業務處理抽象方法-單個
* @param task
*/
protected abstract void doSingleTask(T task);
/**
* 業務處理抽象方法-批量
* @param tasks
*/
protected abstract void doBatchTasks(List tasks);
/**
* 拼裝ump監控key
* @param prefix
* @param taskNameKey
* @return
*/
private String getUmpKey(String prefix,String taskNameKey) {
StringBuffer umpKeyBuf = new StringBuffer();
umpKeyBuf.append(prefix).append(taskNameKey);
return umpKeyBuf.toString();
}
/**
* easyjob平臺異步任務回調方法
* @param scheduleContext
* @return
* @throws Exception
*/
@Override
public TaskResult doTask(ScheduleContext scheduleContext) throws Exception {
String requestNo = TraceUtils.getTraceId();
try {
String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
if (StringUtils.isBlank(traceId)) {
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
}
} catch (Exception e) {
log.error("easyjob執行異常", e);
}
EasyJobTaskServerParam taskServerParam = null;
CallerInfo callerinfo = null;
try {
//條件轉換
taskServerParam = EasyJobCoreUtil.transTaskServerParam(scheduleContext);
String taskNameKey = getTaskNameKey();
String umpKey = getUmpKey(EASYJOB_UMP_KEY_RREFIX,taskNameKey);
callerinfo = Profiler.registerInfo(umpKey, Constants.TRANS_BASIC, false, true);
//多服務器,并且非子任務,本次不執行,提交子任務
if (taskServerParam.getServerCount() > 1 && !taskServerParam.isSubTask()) {
submitSubTask(scheduleContext, taskServerParam,requestNo);
return TaskResult.success();
}
if (log.isInfoEnabled()) {
log.info("請求編號[{}],開始獲取任務,任務ID[{}],任務名稱[{}],執行參數[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), JSON.toJSONString(taskServerParam));
}
TaskServerParam cloverTaskServerParam = EasyJobCoreUtil.transferCloverTaskServerParam(taskServerParam);
List tasks = this.selectTasks(cloverTaskServerParam, taskServerParam.getCurServer());
if (log.isInfoEnabled()) {
log.info("請求編號[{}],獲取任務ID[{}],任務名稱[{}]共{}條", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks == null ? 0 : tasks.size());
}
if (CollectionUtils.isNotEmpty(tasks)) {
if (log.isInfoEnabled()) {
log.info("請求編號[{}],開始執行任務,任務ID[{}],任務名稱[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName());
}
this.easyJobExecuteTasksInner(taskServerParam, tasks,requestNo);
if (log.isInfoEnabled()) {
log.info("請求編號[{}],執行任務,任務ID[{}],任務名稱[{}],執行數量[{}]完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks.size());
}
}
return TaskResult.success();
} catch (Exception e) {
Profiler.functionError(callerinfo);
if (log.isInfoEnabled()) {
log.error("請求編號[{}],任務執行失敗,任務ID[{}],任務名稱[{}]", requestNo, taskServerParam == null ? "" : taskServerParam.getTaskId(), taskServerParam == null ? "" :taskServerParam.getTaskName(), e);
}
return TaskResult.fail(e.getMessage());
}finally {
try{
MDC.clear();
}catch (RuntimeException ex){
log.error("easyjob執行異常",ex);
}
Profiler.registerInfoEnd(callerinfo);
}
}
/**
* 多分片提交子任務
* @param scheduleContext 調度任務上下文參數
* @param taskServerParam 調度任務參數
* @param requestNo 調度任務參數
* @return void
*/
private void submitSubTask(ScheduleContext scheduleContext, EasyJobTaskServerParam taskServerParam,String requestNo) throws IOException {
log.info("請求編號[{}],執行任務,任務ID[{}],任務名稱[{}],子任務個數[{}],開始提交子任務", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());
String jobClass = scheduleContext.getTaskGetResponse().getJobClass();
if (StringUtils.isBlank(jobClass)) {
throw new RuntimeException("jobClass get error");
}
for (int i = 0; i < taskServerParam.getServerCount(); i++) {
Map dataMap = scheduleContext.getParameters();
//提交子任務標識
dataMap.put("isSubTask", "true");
//給子任務進行編號
dataMap.put("curServer", String.valueOf(i));
//父任務名稱傳遞子任務
dataMap.put("taskName", taskServerParam.getTaskName());
scheduleContext.commitSubTask(jobClass, dataMap, taskServerParam.getExpected(), taskServerParam.getTransactionalAccept());
}
// 父任務等待子任務執行完畢再更改狀態,如果執行時間超過等待時間,拋異常
//scheduleContext.waitForSubtaskCompleted((long) taskServerParam.getServerCount() * taskServerParam.getExpected());
log.info("請求編號[{}],執行任務,任務ID[{}],任務名稱[{}],子任務個數[{}],提交完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());
}
/**
* 創建線程池,按配置參數執行task
* @param param 執行參數
* @param tasks 任務集合
* @param requestNoStr
* @return void
*/
private void easyJobExecuteTasksInner(final EasyJobTaskServerParam param, List tasks,String requestNoStr) {
int threadCount = param.getThreadCount();
synchronized (this) {
if (this.easyJobExecutor == null) {
this.easyJobExecutor = (ThreadPoolExecutor) EasyJobCoreUtil.createCustomeasyJobExecutorService(threadCount, EASYJOB_THREAD_NAME);
this.easyJobLastThreadCount = threadCount;
} else if (threadCount > this.easyJobLastThreadCount) {
this.easyJobExecutor.setMaximumPoolSize(threadCount);
this.easyJobExecutor.setCorePoolSize(threadCount);
this.easyJobLastThreadCount = threadCount;
} else if (threadCount < this.easyJobLastThreadCount) {
this.easyJobExecutor.setCorePoolSize(threadCount);
this.easyJobExecutor.setMaximumPoolSize(threadCount);
this.easyJobLastThreadCount = threadCount;
}
}
List> lists = Lists.partition(tasks, param.getExecuteCount());
final CountDownLatch latch = new CountDownLatch(lists.size());
final String requestNo = requestNoStr;
for (final List list : lists) {
this.easyJobExecutor.submit(
new Callable() {
public Object call() throws Exception {
try{
if (StringUtils.isNotBlank(requestNo)) {
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
}
}catch (RuntimeException e){
log.error("easyjob自定義log跟蹤攔截器執行異常",e);
}
try {
if (log.isInfoEnabled()) {
log.info("請求編號[{}],正在執行任務,任務ID[{}],任務名稱[{}],[{}],條數:[{}]...", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
}
executeTasks(list);
if (log.isInfoEnabled()) {
log.info("請求編號[{}],執行任務,任務ID[{}],任務名稱[{}],[{}],條數:[{}]成功!", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
} finally {
try{
MDC.clear();
}catch (RuntimeException ex){
log.error("easyjob自定義log跟蹤攔截器執行異常",ex);
}
latch.countDown();
}
return null;
}
}
);
}
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException("interrupted when processing data access request in concurrency", e);
}
}
/**
* 獲取任務名稱
* @return
*/
private String getTaskNameKey(){
StringBuffer keyBuf = new StringBuffer();
keyBuf.append(activeEnv)
.append(Constants.SEPARATOR_UNDERLINE)
.append(this.getClass().getSimpleName());
return keyBuf.toString();
}
protected void executeTasks(List taskList) {
if(CollectionUtils.isEmpty(taskList)) {
return;
}
this.doTasks(taskList);
}
/**
* 業務處理抽象方法
* @param list
*/
protected void doTasks(List list){
if(isDoBatchTasks()){
CallerInfo info = Profiler.registerInfo(getClass().getName()+"_batch", Constants.TRANS_BASIC,false, true);
try {
/** 開始執行各個子類真正業務邏輯 */
this.doBatchTasks(list);
} catch(CommonBusinessException ex){
log.warn(ex.getMessage());
} catch (Exception e) {
Profiler.functionError(info);
log.error("任務處理失敗,方法:{},任務:{}",ClassHelper.getMethod(),JSON.toJSONString(list), e);
} finally {
Profiler.registerInfoEnd(info);
}
}else{
for (T task : list) {
CallerInfo info = Profiler.registerInfo(getClass().getName(), Constants.TRANS_BASIC,false, true);
if(task == null) { continue; }
String lockKey = "";
try {
/** 開始執行各個子類真正業務邏輯 */
if (useConcurrentLock()) {
lockKey = getLockKey(task);
if (redisHelper.lock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey)) {
this.doSingleTask(task);
}else{
lockKey = "";
log.warn("lockKey:{},加載失敗,正在被其他用戶鎖定,請重試!",lockKey);
}
} else {
this.doSingleTask(task);
}
} catch(CommonBusinessException ex){
log.warn(ex.getMessage());
} catch (Exception e) {
Profiler.functionError(info);
log.error("任務處理失敗,方法:{},任務:{}",ClassHelper.getMethod(),JSON.toJSONString(task), e);
} finally {
Profiler.registerInfoEnd(info);
if (StringUtils.isNotBlank(lockKey)) {
redisHelper.unlock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey);
}
}
}
}
}
/**
* 獲取實體類的實際類型
*
* @return
*/
private Class getArgumentType() {
return (Class) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
}
/**
* 是否使用防并發鎖
* 默認不使用,如需使用子類重寫該方法
* @return
*/
protected boolean useConcurrentLock() {
return false;
}
/**
* 根所注解獲取LockKey,可被子類重寫,提高效率
*
* @param businessObj 業務對象
* @return concurrent lock key
*/
protected String getLockKey( T businessObj) {
StringBuilder lockKey = new StringBuilder(EASYJOB_SINGLE_TASK_LOCK_PREFIX);
//若存在注解指定的防重字段,則使用這些字段拼裝防重Key,否則使用MQ業務主鍵防重
List valueEntries = getAnnotaionConcurrentKeys(businessObj);
if (!CollectionUtils.isEmpty(valueEntries)) {
for (ValueEntryInfo valueEntry : valueEntries) {
lockKey.append(Constants.SEPARATOR_UNDERLINE);
lockKey.append(valueEntry.getValue());
}
} else {
throw new CommonBusinessException(String.format("此任務處理需要加分布式鎖,但是未設置鎖key,所以不做業務處理,請檢查,任務信息:%s",JSON.toJSONString(businessObj)));
}
return lockKey.toString();
}
/**
* 查找對象的ConccurentKey注解,獲取防重字段,并排序返回
*
* @param businessObj 業務對象
* @return 有序的業務字段值列表
*/
private List getAnnotaionConcurrentKeys(T businessObj) {
List valueEntries = new ArrayList();
Field[] fields = businessObj.getClass().getDeclaredFields();
for (int i = 0; i < fields.length; i++) {
ConcurrentKey concurrentKey = fields[i].getAnnotation(ConcurrentKey.class);
if (concurrentKey != null) {
fields[i].setAccessible(true);
Object fieldVal = null;
try {
ValueEntryInfo valueEntry = new ValueEntryInfo();
fieldVal = fields[i].get(businessObj);
if (fieldVal != null) {
valueEntry.setValue(String.format("%1$s", fieldVal));
valueEntry.setOrder(concurrentKey.order());
valueEntries.add(valueEntry);
}
} catch (IllegalAccessException e) {
log.error("IllegalAccess-{}.{}", businessObj.getClass().getName(), fields[i].getName());
}
}
}
if (valueEntries.size() > 1) {
//排序ConcurrentKey
Collections.sort(valueEntries, new Comparator() {
@Override
public int compare(ValueEntryInfo o1, ValueEntryInfo o2) {
if (o1.getOrder() > o2.getOrder()) {
return 1;
} else if (o1.getOrder() == o2.getOrder()) {
return 0;
} else {
return -1;
}
}
});
}
return valueEntries;
}
protected List selectTasks(TaskServerParam taskServerParam, int curServer) {
return this.loadTasks(taskServerParam, curServer);
}
/**
* 獲取select時的任務創建開始時間
* @param serverArg
* @return
*/
protected Date getCreateTimeFrom(String serverArg){
return null;
}
/**
* 是否以批量方式處理任務
* @return
*/
protected boolean isDoBatchTasks(){
return false;
}
}
實戰結果
上述所述均為透傳ID場景的原理和示例代碼,實戰效果如下圖:調用jsf超時,跨系統查看日志進行排查,得知為慢sql引起
上述大部分場景已經抽出一個通用jar包,詳細使用教程見我的另一篇文章:分布式日志追蹤ID使用教程
審核編輯 黃宇
-
分布式
+關注
關注
1文章
923瀏覽量
74608 -
JSF
+關注
關注
0文章
12瀏覽量
7763 -
過濾器
+關注
關注
1文章
432瀏覽量
19719
發布評論請先 登錄
相關推薦
評論