色哟哟视频在线观看-色哟哟视频在线-色哟哟欧美15最新在线-色哟哟免费在线观看-国产l精品国产亚洲区在线观看-国产l精品国产亚洲区久久

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

分布式日志追蹤ID實戰

京東云 ? 來源:京東物流 張小龍 ? 作者:京東物流 張小龍 ? 2025-01-20 10:16 ? 次閱讀

作者:京東物流 張小龍

本文通過介紹分布式應用下各個場景的全局日志ID透傳思路,以及介紹分布式日志追蹤ID簡單實現原理和實戰效果,從而達到通過提高日志查詢排查問題的效率。

背景

開發排查系統問題用得最多的手段就是查看系統日志,相信不少人都值過班當過小秘吧:給下接口和出入參吧,麻煩看看日志里的有沒有異常信息啊等等,但是在并發大時使用日志定位問題還是比較麻煩,由于大量的其他用戶/其他線程的日志也一起輸出穿行其中導致很難篩選出指定請求的全部相關日志,以及下游線程/服務對應的日志,甚至一些特殊場景的出入參只打印了一些諸如gis坐標、四級地址等沒有單據信息的日志,使得日志定位起來非常不便

場景分析

自己所在組負責的系統主要是web應用,其中涉及到的請求方式主要有:springmvc的servlet的http場景、jsf場景、MQ場景、resteasy場景、clover場景、easyjob場景,每一種場景都需要不同的方式進行logTraceId的透傳,接下來逐個探析上述各個場景的透傳方案。

在這之前我們先要簡單了解一下日志中透傳和打印logTraceId的方式,一般我們使用MDC進行logTraceId的透傳與打印,但是基于MDC內部使用的是ThreadLocal所以只有本線程才有效,子線程服務的MDC里的值會丟失,所以這里我們要么是在所有涉及到父子線程的地方以編碼侵入式自行實現值的傳遞,要么就是通過覆寫MDCAdapter:通過阿里的TransmittableThreadLocal來解決父子線程傳遞問題,而本文采用的是比較粗糙地以編碼侵入式來解決此問題。

springmvc的servlet的http場景

這個場景相信大家都已經爛熟到骨子里了,主要思路是通過攔截器的方式進行logTraceId的透傳,新建一個類實現HandlerInterceptor

preHandle:在業務處理器處理請求之前被調用,這里實現logTraceId的設置與透傳

postHandle:在業務處理器處理請求執行完成后,生成視圖之前執行,這里空實現就好

afterCompletion:在DispatcherServlet完全處理完請求后被調用,這里用于清除MDC的logTraceId

@Slf4j
public class TraceInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) throws Exception {
        try{
            String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, TraceUtils.getTraceId());
            }
        }catch (RuntimeException e){
            log.error("mvc自定義log跟蹤攔截器執行異常",e);
        }
        return true;
    }

    @Override
    public void postHandle(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
    }

    @Override
    public void afterCompletion(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
        try{
            MDC.clear();
        }catch (RuntimeException ex){
            log.error("mvc自定義log跟蹤攔截器執行異常",ex);
        }
    }
}

jsf場景

相信大家對于jsf并不陌生,而jsf也支持自定義filter,基于jsf過濾器的運行方式,可以通過配置全局過濾器(繼承AbstractFilter)的方式進行logTraceId的透傳,需要注意的是jsf是在線程池中執行的所以一定要信任消息體中的logTraceId

jsf消費者過濾器:主要從上下文環境中獲取logTraceId并進行透傳,實現代碼如下

@Slf4j
public class TraceIdGlobalJsfFilter extends AbstractFilter {
    @Override
    public ResponseMessage invoke(RequestMessage requestMessage) {
        //設置traceId
        setAndGetTraceId(requestMessage);
        try{
            return this.getNext().invoke(requestMessage);
        }finally {
        }
    }

    /**
     * 設置并返回traceId
     * @param requestMessage
     * @return
     */
    private void setAndGetTraceId(RequestMessage requestMessage) {
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
                //如果filter和MDC都沒有獲取到則說明有遺漏,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消費者自定義log跟蹤攔截器預警,filter和MDC都沒有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            } else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
                //如果MDC沒有,filter有,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消費者自定義log跟蹤攔截器預警,MDC沒有filter有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            } else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
                //如果MDC有,filter沒有,說明是源頭已經有了,但是jsf是第一次調,透傳
                requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, logTraceId);
            }else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null){
                //MDC和fitler都有,但是并不相等,則存在問題打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf消費者自定義log跟蹤攔截器預警,MDC和filter都有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            }
        }catch (RuntimeException e){
            log.error("jsf消費者自定義log跟蹤攔截器執行異常",e);
        }
    }
}

jsf提供者過濾器:通過拿到消費者在消息體中透傳的logTraceId來實現,實現代碼如下

@Slf4j
public class TraceIdGlobalJsfProducerFilter extends AbstractFilter {
    @Override
    public ResponseMessage invoke(RequestMessage requestMessage) {
        //設置traceId
        boolean isNeedClearMdc = transferTraceId(requestMessage);
        try{
            return this.getNext().invoke(requestMessage);
        }finally {
            if(isNeedClearMdc){
                clear();
            }
        }
    }
    /**
     * 設置并返回traceId
     * @param requestMessage
     * @return
     */
    private boolean transferTraceId(RequestMessage requestMessage) {
        boolean isNeedClearMdc = false;
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
                //如果filter和MDC都沒有獲取到,說明存在遺漏場景或是提供給外部系統調用的接口,打印日志進行觀察
                String traceId = TraceUtils.getTraceId();
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,traceId);
                requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, traceId);
                if(log.isDebugEnabled()){
                    log.debug("jsf生產者自定義log跟蹤攔截器預警,filter和MDC都沒有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
                isNeedClearMdc = true;
            } else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
                //如果MDC沒有,filter有,說明是被調用方,需要透傳下去
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceIdObj.toString());
                isNeedClearMdc = true;
            } else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
                //如果MDC有,filter沒有,存在問題,打印日志
                if(log.isDebugEnabled()){
                    log.debug("jsf生產者自定義log跟蹤攔截器預警,MDC有filter沒有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
                isNeedClearMdc = true;
            }else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null && !logTraceId.equals(logTraceIdObj.toString())){
                //MDC和fitler都有,但是并不相等,則信任filter透傳結果
                TraceUtils.resetTraceId(logTraceIdObj.toString());
                if(log.isDebugEnabled()){
                    log.debug("jsf生產者自定義log跟蹤攔截器預警,MDC和fitler都有traceId,但是并不相等,jsf信息:{}", JSON.toJSONString(requestMessage));
                }
            }
            return isNeedClearMdc;
        }catch (RuntimeException e){
            log.error("jsf生產者自定義log跟蹤攔截器執行異常",e);
            return false;
        }
    }

    /**
     * 清除MDC
     */
    private void clear() {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("jsf生產者自定義log跟蹤攔截器執行異常",e);
        }
    }
}

MQ場景

說到MQ相信大家對于此就更不陌生了,此種場景主要通過在提供者發送消息時拿到上下文中的logTraceId,將其以擴展信息的方式設置進消息體中進行透傳,而消費者則從消息體中進行獲取

生產者:新建一個抽象類繼承MessageProducer,覆寫父類中的兩個send方法(批量發送、單條發送),send方法中主要調用抽象加工消息體的方法(logTraceId屬性賦值)和日志打印,在子類中進行發送前對消息體的加工處理,具體代碼如下

@Slf4j
public abstract class BaseTraceIdProducer extends MessageProducer {

    private static final String SEPARATOR_COMMA = ",";

    public BaseTraceIdProducer() {
    }

    public BaseTraceIdProducer(TransportManager transportManager) {
        super(transportManager);
    }

    /**
     * 獲取消息體-單個
     * @param messageContext
     * @return
     */
    protected abstract Message getMessage(MessageContext messageContext);

    /** 獲取消息體-批量
     *
     * @param messageContext
     * @return
     */
    protected abstract List getMessages(MessageContext messageContext);

    /**
     * 填充消息體上下文信息
     * @param message
     * @param messageContext
     */
    protected void fillContext(Message message,MessageContext messageContext) {
        if(message == null){
            return;
        }
        if(StringUtils.isBlank(messageContext.getLogTraceId())){
            String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
            messageContext.setLogTraceId(logTraceId);
        }
        if(StringUtils.isBlank(messageContext.getTopic())){
            String topic = message.getTopic();
            messageContext.setTopic(topic);
        }
        String businessId = message.getBusinessId();
        messageContext.getBusinessIdBuf().append(SEPARATOR_COMMA).append(businessId);
    }

    /**
     * traceId嵌入消息體中
     * @param message
     */
    protected void generateTraceIdIntoMessage(Message message){
        if(message == null){
            return;
        }
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId)){
                logTraceId = TraceUtils.getTraceId();
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
            }
            message.setAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY,logTraceId);
        }catch (RuntimeException e){
            log.error("jmq2自定義log跟蹤攔截器執行異常",e);
        }
    }

    /**
     * 批量發送消息-無回調
     * @param messages
     * @param timeout
     * @throws JMQException
     */
    public void send(List messages, int timeout) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessages(messages);
        List messageList = this.getMessages(messageContext);
        //打印日志,方便排查問題
        printLog(messageContext);
        super.send(messageList, timeout);
    }

    /**
     * 單個發送消息
     * @param message
     * @param transaction
     * @param 
     * @return
     * @throws JMQException
     */
    public  T send(Message message, LocalTransaction transaction) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessage(message);
        Message msg = this.getMessage(messageContext);
        //打印日志,方便排查問題
        printLog(messageContext);
        return super.send(msg, transaction);
    }

    /**
     * 批量發送消息-有回調
     * @param messages
     * @param timeout
     * @param callback
     * @throws JMQException
     */
    public void send(List messages, int timeout, AsyncSendCallback callback) throws JMQException {
        MessageContext messageContext = new MessageContext();
        messageContext.setMessages(messages);
        List messageList = this.getMessages(messageContext);
        //打印日志,方便排查問題
        printLog(messageContext);
        super.send(messageList, timeout, callback);
    }

    /**
     * 打印日志,方便排查問題
     * @param messageContext
     */
    private void printLog(MessageContext messageContext) {
        if(messageContext==null){
            return;
        }
        if(log.isInfoEnabled()){
            log.info("MQ發送:traceId:{},topic:{},businessIds:[{}]",messageContext.getLogTraceId(),messageContext.getTopic(),messageContext.getBusinessIdBuf()==null?"":messageContext.getBusinessIdBuf().toString());
        }
    }

}

@Slf4j
public class TraceIdEnvMessageProducer extends BaseTraceIdProducer {

    private static final String UAT_TRUE = String.valueOf(true);
    private boolean uat = false;

    public TraceIdEnvMessageProducer() {
    }

    public TraceIdEnvMessageProducer(TransportManager transportManager) {
        super(transportManager);
    }

    /**
     * 環境變量打標-單個消息體
     * @param message
     */
    private void convertUatMessage(Message message) {
        if (message != null) {
            message.setAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT, UAT_TRUE);
        }
    }


    /**
     * 消息轉換-批量消息體
     * @param messageContext
     * @return
     */
    private List convertMessages(MessageContext messageContext) {
        List messages = messageContext.getMessages();
        if (!CollectionUtils.isEmpty(messages)) {
            Iterator messageIterator = messages.iterator();
            while(messageIterator.hasNext()) {
                Message message = (Message)messageIterator.next();
                if(this.isUat()){
                    this.convertUatMessage(message);
                }
                super.generateTraceIdIntoMessage(message);
                super.fillContext(message,messageContext);
            }
        }
        return messageContext.getMessages();
    }

    /**
     * 消息轉換-單個消息體
     * @param messageContext
     * @return
     */
    private Message convertMessage(MessageContext messageContext){
        Message message = messageContext.getMessage();
        if(this.isUat()){
            this.convertUatMessage(message);
        }
        super.generateTraceIdIntoMessage(message);
        super.fillContext(message,messageContext);
        return message;
    }

    protected Message getMessage(MessageContext messageContext) {
        if(log.isDebugEnabled()){
            log.debug("current environment is UAT : {}", this.isUat());
        }
        return this.convertMessage(messageContext);
    }

    protected List getMessages(MessageContext messageContext) {
        if(log.isDebugEnabled()){
            log.debug("current environment is UAT : {}", this.isUat());
        }
        return this.convertMessages(messageContext);
    }

    public void setUat(boolean uat) {
        this.uat = uat;
    }

    boolean isUat() {
        return this.uat;
    }

}

消費者:新建一個抽象類繼承MessageListener,覆寫父類中的onMessage方法,主要進行設置日志traceId和消費完成后的traceId清理等,而在子類中進行一些自定義處理,具體代碼如下

@Slf4j
public abstract class BaseTraceIdMessageListener implements MessageListener {

    public BaseTraceIdMessageListener() {
    }

    public abstract void onMessageList(List messages) throws Exception;

    @Override
    public final void onMessage(List messages) throws Exception {
        try{
            if(CollectionUtils.isEmpty(messages)){
                return;
            }
            //設置日志traceId
            setLogTraceId(messages);
            this.onMessageList(messages);
            //消費完后清除traceId
            clear();
        }catch (Exception e){
            throw e;
        }finally {
            MDC.clear();
        }
    }

    /**
     * 設置日志traceId
     * @param messages
     */
    private void setLogTraceId(List messages) {
        try{
            Message message = messages.get(0);
            String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId)){
                logTraceId = TraceUtils.getTraceId();
            }
            MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
        }catch (RuntimeException e){
            log.error("jmq2自定義log跟蹤攔截器執行異常",e);
        }
    }

    /**
     * 清除traceId
     */
    private void clear() {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("jmq2自定義log跟蹤攔截器執行異常",e);
        }
    }

}

@Slf4j
public abstract class TraceIdEnvMessageListener extends BaseTraceIdMessageListener{

    private String uat;

    public TraceIdEnvMessageListener() {
    }

    public abstract void onMessages(List var1) throws Exception;

    @Override
    public void onMessageList(List messages) throws Exception {
        Iterator iterator;
        Message message;
        if (this.getUat() != null && Boolean.valueOf(this.getUat())) {
            iterator = messages.iterator();

            while(true) {
                while(iterator.hasNext()) {
                    message = (Message)iterator.next();
                    if (message != null && Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
                        this.onMessages(Arrays.asList(message));
                    } else {
                        log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
                    }
                }

                return;
            }
        } else if (this.getUat() != null && !Boolean.valueOf(this.getUat())) {
            iterator = messages.iterator();

            while(true) {
                while(iterator.hasNext()) {
                    message = (Message)iterator.next();
                    if (message != null && !Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
                        this.onMessages(Arrays.asList(message));
                    } else {
                        log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
                    }
                }

                return;
            }
        } else {
            this.onMessages(messages);
        }
    }

    public void setUat(String uat) {
        if (!"true".equals(uat) && !"false".equals(uat)) {
            throw new IllegalArgumentException("uat 屬性值只能為 true 或 false.");
        } else {
            this.uat = uat;
        }
    }

    public String getUat() {
        return this.uat;
    }
}

resteasy場景

此場景類似于spinrg-mvc場景,也是http請求,需要通過攔截器在消息頭中進行logTraceId的透傳,主要有客戶端攔截器,服務端:預處理攔截器、后置攔截器,代碼如下

@ClientInterceptor
@Provider
@Slf4j
public class ResteasyClientInterceptor implements ClientExecutionInterceptor {
    @Override
    public ClientResponse execute(ClientExecutionContext clientExecutionContext) throws Exception {
        try{
            String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            ClientRequest request = clientExecutionContext.getRequest();
            String headerTraceId = request.getHeaders().getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
            if(StringUtils.isBlank(logTraceId) && StringUtils.isBlank(headerTraceId)){
                //如果filter和MDC都沒有獲取到則說明是調用源頭
                String traceId = TraceUtils.getTraceId();
                TraceUtils.resetTraceId(traceId);
                request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,traceId);
            } else if(StringUtils.isBlank(headerTraceId)){
                //如果MDC有但是filter沒有則需要傳遞
                request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,logTraceId);
            }
        }catch (RuntimeException e){
            log.error("resteasy客戶端log跟蹤攔截器執行異常",e);
        }
        return clientExecutionContext.proceed();
    }
}

@Slf4j
@Provider
@ServerInterceptor
public class RestEasyPreInterceptor implements PreProcessInterceptor {
    @Override
    public ServerResponse preProcess(HttpRequest request, ResourceMethod resourceMethod) throws Failure, WebApplicationException {
        try{
            MultivaluedMap requestHeaders = request.getHttpHeaders().getRequestHeaders();
            String headerTraceId = requestHeaders.getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
            if(StringUtils.isNotBlank(headerTraceId)){
                //如果filter則透傳
                TraceUtils.resetTraceId(headerTraceId);
            }
        }catch (RuntimeException e){
            log.error("resteasy服務端log跟蹤前置攔截器執行異常",e);
        }
        return null;
    }
}

@Slf4j
@Provider
@ServerInterceptor
public class ResteasyPostInterceptor implements PostProcessInterceptor {
    @Override
    public void postProcess(ServerResponse serverResponse) {
        try{
            MDC.clear();
        }catch (RuntimeException e){
            log.error("resteasy服務端log跟蹤后置攔截器執行異常",e);
        }
    }
}

clover場景

clover的大體機制主要是在項目啟動的時候掃描到帶有注解@HessianWebService的類進行服務注冊并維持心跳檢測,而clover端則通過servlet請求方式進行任務的回調,同時繼承AbstractScheduleTaskProcess方式的任務是以線程池的方式進行業務的處理

基于上述原理我們需要解決兩個問題:1.新建一個類繼承ServiceExporterServlet,并在web.xml配置中進行servlet配置,代碼如下;

@Slf4j
public class ServiceExporterTraceIdServlet extends ServiceExporterServlet {

    @Override
    public void service(ServletRequest req, ServletResponse res) throws ServletException, IOException {
        try {
            String traceId = MDC.get("traceId");
            if (StringUtils.isBlank(traceId)) {
                MDC.put("traceId", TraceUtils.getTraceId());
            }
        } catch (Exception e) {
            log.error("clover請求servlet執行異常", e);
        }
        try {
            super.service(req, res);
        } catch (Throwable e) {
            log.error("clover請求servlet執行異常", e);
            throw e;
        }finally {
            try{
                MDC.clear();
            }catch (RuntimeException ex){
                log.error("clover請求servlet執行異常",ex);
            }
        }
    }
}

2.新建一個抽象類繼承AbstractScheduleTaskProcess,在類中以編碼形式進行父子線程的透傳(可優化:通過覆寫MDCAdapter:通過阿里的TransmittableThreadLocal來解決父子線程傳遞問題),所有任務均改為繼承此類,關鍵代碼如下

try{
            traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                log.warn("clover自定義log跟蹤攔截器預警,mdc沒有traceId");
            }
        }catch (RuntimeException e){
            log.error("clover自定義log跟蹤攔截器執行異常",e);
        }
        final String logTraceId = traceId;
        while(iterator.hasNext()) {
            final List list = (List)iterator.next();
            this.executor.submit(new Callable() {
                public Object call() throws Exception {
                    try{
                        if (StringUtils.isNotBlank(logTraceId)) {
                            MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, logTraceId);
                        }
                    }catch (RuntimeException e){
                        log.error("clover自定義log跟蹤攔截器執行異常",e);
                    }
                    Object var1;
                    try {
                        if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
                            BaseTcTaskProcessWorker.logger.info("正在執行任務[" + this.getClass().getName() + "],條數:" + list.size() + "...");
                        }


                        BaseTcTaskProcessWorker.this.executeTasks(list);

                        if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
                            BaseTcTaskProcessWorker.logger.info("執行任務[" + this.getClass().getName() + "],條數:" + list.size() + "成功!");
                        }

                        var1 = null;
                    } catch (Exception var5) {
                        BaseTcTaskProcessWorker.logger.error(var5.getMessage(), var5);
                        throw var5;
                    } finally {
                        try{
                            MDC.clear();
                        }catch (RuntimeException ex){
                            log.error("clover自定義log跟蹤攔截器執行異常",ex);
                        }
                        latch.countDown();
                    }

                    return var1;
                }
            });
        }

easyjob場景

easyjob的大體機制是在項目啟動的時候通過掃描實現接口Scheduler的類進行上報注冊,同時啟動一個acceptor(獲取任務的線程池),而acceptor拉取到任務后會將父任務放進一個叫executor的線程池,子任務范進一個叫slowExecutor的線程池,我們可以新建一個抽獎類實現接口ScheduleFlowTask,復用clover場景硬編碼方式進行父子線程logTraceId的透傳處理(可優化:通過覆寫MDCAdapter:通過阿里的TransmittableThreadLocal來解決父子線程傳遞問題),示例代碼如下

?

@Slf4j
public abstract class AbstractEasyjobOnlyScheduleProcess implements ScheduleFlowTask {

    /**
     * EASYJOB平臺UMP監控key前綴
     */
    private static final String EASYJOB_UMP_KEY_RREFIX = "trans.easyjob.dotask.";

    /**
     * EASYJOB單個任務處理分布式鎖前綴
     */
    private static final String EASYJOB_SINGLE_TASK_LOCK_PREFIX = "basic_easyjob_single_task_lock_prefix_";

    /**
     * 環境標識-開關配置進行環境隔離
     */
    @Value("${spring.profiles.active}")
    private String activeEnv;

    @Value("${task.scene.mark}")
    private String sceneMark = TaskSceneMarkEnum.PRODUCTION.getDesc();

    /**
     * easyJob維度線程池變量
     */
    private ThreadPoolExecutor easyJobExecutor;
    /**
     * easyJob維度服務器個數-分片個數
     */
    private volatile int easyJobLastThreadCount = 0;

    /**
     * easyjob多線程名稱
     */
    private static final String EASYJOB_THREAD_NAME = "dts.easyJobs";

    /**
     * 子類的泛型參數類型
     */
    private Class argumentType;

    /**
     * 無參構造
     */
    public AbstractEasyjobOnlyScheduleProcess() {
        //設置子類泛型參數類型
        argumentType = this.getArgumentType();
    }

    @Autowired
    private RedisHelper redisHelper;

    /**
     * 非task表掃描待處理的任務數據
     * @param taskServerParam
     * @param curServer
     * @return
     */
    protected abstract List loadTasks(TaskServerParam taskServerParam, int curServer);

    /**
     * 業務處理抽象方法-單個
     * @param task
     */
    protected abstract void doSingleTask(T task);

    /**
     * 業務處理抽象方法-批量
     * @param tasks
     */
    protected abstract void doBatchTasks(List tasks);

    /**
     * 拼裝ump監控key
     * @param prefix
     * @param taskNameKey
     * @return
     */
    private String getUmpKey(String prefix,String taskNameKey) {
        StringBuffer umpKeyBuf = new StringBuffer();
        umpKeyBuf.append(prefix).append(taskNameKey);
        return umpKeyBuf.toString();
    }

    /**
     * easyjob平臺異步任務回調方法
     * @param scheduleContext
     * @return
     * @throws Exception
     */
    @Override
    public TaskResult doTask(ScheduleContext scheduleContext) throws Exception {
        String requestNo = TraceUtils.getTraceId();
        try {
            String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
            if (StringUtils.isBlank(traceId)) {
                MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
            }
        } catch (Exception e) {
            log.error("easyjob執行異常", e);
        }
        EasyJobTaskServerParam taskServerParam = null;

        CallerInfo callerinfo = null;
        try {
            //條件轉換
            taskServerParam = EasyJobCoreUtil.transTaskServerParam(scheduleContext);
            String taskNameKey = getTaskNameKey();
            String umpKey = getUmpKey(EASYJOB_UMP_KEY_RREFIX,taskNameKey);
            callerinfo = Profiler.registerInfo(umpKey, Constants.TRANS_BASIC, false, true);
            //多服務器,并且非子任務,本次不執行,提交子任務
            if (taskServerParam.getServerCount() > 1 && !taskServerParam.isSubTask()) {
                submitSubTask(scheduleContext, taskServerParam,requestNo);
                return TaskResult.success();
            }

            if (log.isInfoEnabled()) {
                log.info("請求編號[{}],開始獲取任務,任務ID[{}],任務名稱[{}],執行參數[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), JSON.toJSONString(taskServerParam));
            }
            TaskServerParam cloverTaskServerParam = EasyJobCoreUtil.transferCloverTaskServerParam(taskServerParam);

            List tasks = this.selectTasks(cloverTaskServerParam, taskServerParam.getCurServer());

            if (log.isInfoEnabled()) {
                log.info("請求編號[{}],獲取任務ID[{}],任務名稱[{}]共{}條", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks == null ? 0 : tasks.size());
            }

            if (CollectionUtils.isNotEmpty(tasks)) {
                if (log.isInfoEnabled()) {
                    log.info("請求編號[{}],開始執行任務,任務ID[{}],任務名稱[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName());
                }

                this.easyJobExecuteTasksInner(taskServerParam, tasks,requestNo);
                if (log.isInfoEnabled()) {
                    log.info("請求編號[{}],執行任務,任務ID[{}],任務名稱[{}],執行數量[{}]完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks.size());
                }

            }
            return TaskResult.success();
        } catch (Exception e) {
            Profiler.functionError(callerinfo);
            if (log.isInfoEnabled()) {
                log.error("請求編號[{}],任務執行失敗,任務ID[{}],任務名稱[{}]", requestNo, taskServerParam == null ? "" : taskServerParam.getTaskId(), taskServerParam == null ? "" :taskServerParam.getTaskName(), e);
            }
            return TaskResult.fail(e.getMessage());
        }finally {
            try{
                MDC.clear();
            }catch (RuntimeException ex){
                log.error("easyjob執行異常",ex);
            }
            Profiler.registerInfoEnd(callerinfo);
        }
    }

    /**
     * 多分片提交子任務
     * @param scheduleContext 調度任務上下文參數
     * @param taskServerParam 調度任務參數
     * @param requestNo 調度任務參數
     * @return void
     */
    private void submitSubTask(ScheduleContext scheduleContext, EasyJobTaskServerParam taskServerParam,String requestNo) throws IOException {

        log.info("請求編號[{}],執行任務,任務ID[{}],任務名稱[{}],子任務個數[{}],開始提交子任務", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());

        String jobClass = scheduleContext.getTaskGetResponse().getJobClass();

        if (StringUtils.isBlank(jobClass)) {
            throw new RuntimeException("jobClass get error");
        }

        for (int i = 0; i < taskServerParam.getServerCount(); i++) {
            Map dataMap = scheduleContext.getParameters();
            //提交子任務標識
            dataMap.put("isSubTask", "true");
            //給子任務進行編號
            dataMap.put("curServer", String.valueOf(i));
            //父任務名稱傳遞子任務
            dataMap.put("taskName", taskServerParam.getTaskName());
            scheduleContext.commitSubTask(jobClass, dataMap, taskServerParam.getExpected(), taskServerParam.getTransactionalAccept());
        }
        // 父任務等待子任務執行完畢再更改狀態,如果執行時間超過等待時間,拋異常
        //scheduleContext.waitForSubtaskCompleted((long) taskServerParam.getServerCount() * taskServerParam.getExpected());
        log.info("請求編號[{}],執行任務,任務ID[{}],任務名稱[{}],子任務個數[{}],提交完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());
    }

    /**
     * 創建線程池,按配置參數執行task
     * @param param 執行參數
     * @param tasks 任務集合
     * @param requestNoStr
     * @return void
     */
    private void easyJobExecuteTasksInner(final EasyJobTaskServerParam param, List tasks,String requestNoStr) {
        int threadCount = param.getThreadCount();
        synchronized (this) {
            if (this.easyJobExecutor == null) {
                this.easyJobExecutor = (ThreadPoolExecutor) EasyJobCoreUtil.createCustomeasyJobExecutorService(threadCount, EASYJOB_THREAD_NAME);
                this.easyJobLastThreadCount = threadCount;
            } else if (threadCount > this.easyJobLastThreadCount) {
                this.easyJobExecutor.setMaximumPoolSize(threadCount);
                this.easyJobExecutor.setCorePoolSize(threadCount);
                this.easyJobLastThreadCount = threadCount;
            } else if (threadCount < this.easyJobLastThreadCount) {
                this.easyJobExecutor.setCorePoolSize(threadCount);
                this.easyJobExecutor.setMaximumPoolSize(threadCount);
                this.easyJobLastThreadCount = threadCount;
            }
        }

        List> lists = Lists.partition(tasks, param.getExecuteCount());
        final CountDownLatch latch = new CountDownLatch(lists.size());
        final String requestNo = requestNoStr;
        for (final List list : lists) {
            this.easyJobExecutor.submit(
                    new Callable() {
                        public Object call() throws Exception {
                            try{
                                if (StringUtils.isNotBlank(requestNo)) {
                                    MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
                                }
                            }catch (RuntimeException e){
                                log.error("easyjob自定義log跟蹤攔截器執行異常",e);
                            }
                            try {
                                if (log.isInfoEnabled()) {
                                    log.info("請求編號[{}],正在執行任務,任務ID[{}],任務名稱[{}],[{}],條數:[{}]...", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
                                }
                                executeTasks(list);
                                if (log.isInfoEnabled()) {
                                    log.info("請求編號[{}],執行任務,任務ID[{}],任務名稱[{}],[{}],條數:[{}]成功!", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
                                }
                            } catch (Exception e) {
                                log.error(e.getMessage(), e);
                                throw e;
                            } finally {
                                try{
                                    MDC.clear();
                                }catch (RuntimeException ex){
                                    log.error("easyjob自定義log跟蹤攔截器執行異常",ex);
                                }
                                latch.countDown();
                            }
                            return null;
                        }

                    }
            );
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException("interrupted when processing data access request in concurrency", e);
        }
    }

    /**
     * 獲取任務名稱
     * @return
     */
    private String getTaskNameKey(){
        StringBuffer keyBuf = new StringBuffer();
        keyBuf.append(activeEnv)
                .append(Constants.SEPARATOR_UNDERLINE)
                .append(this.getClass().getSimpleName());
        return keyBuf.toString();
    }

    protected void executeTasks(List taskList) {
        if(CollectionUtils.isEmpty(taskList)) {
            return;
        }
        this.doTasks(taskList);
    }

    /**
     * 業務處理抽象方法
     * @param list
     */
    protected void doTasks(List list){
        if(isDoBatchTasks()){
            CallerInfo info = Profiler.registerInfo(getClass().getName()+"_batch", Constants.TRANS_BASIC,false, true);
            try {
                /** 開始執行各個子類真正業務邏輯 */
                this.doBatchTasks(list);
            } catch(CommonBusinessException ex){
                log.warn(ex.getMessage());
            } catch (Exception e) {
                Profiler.functionError(info);
                log.error("任務處理失敗,方法:{},任務:{}",ClassHelper.getMethod(),JSON.toJSONString(list), e);
            } finally {
                Profiler.registerInfoEnd(info);
            }
        }else{
            for (T task : list) {
                CallerInfo info = Profiler.registerInfo(getClass().getName(), Constants.TRANS_BASIC,false, true);
                if(task == null) { continue; }
                String lockKey = "";
                try {
                    /** 開始執行各個子類真正業務邏輯 */
                    if (useConcurrentLock()) {
                        lockKey = getLockKey(task);
                        if (redisHelper.lock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey)) {
                            this.doSingleTask(task);
                        }else{
                            lockKey = "";
                            log.warn("lockKey:{},加載失敗,正在被其他用戶鎖定,請重試!",lockKey);
                        }
                    } else {
                        this.doSingleTask(task);
                    }
                } catch(CommonBusinessException ex){
                    log.warn(ex.getMessage());
                } catch (Exception e) {
                    Profiler.functionError(info);
                    log.error("任務處理失敗,方法:{},任務:{}",ClassHelper.getMethod(),JSON.toJSONString(task), e);
                } finally {
                    Profiler.registerInfoEnd(info);
                    if (StringUtils.isNotBlank(lockKey)) {
                        redisHelper.unlock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey);
                    }
                }
            }
        }
    }

    /**
     * 獲取實體類的實際類型
     *
     * @return
     */
    private Class getArgumentType() {
        return (Class) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }

    /**
     * 是否使用防并發鎖
     * 默認不使用,如需使用子類重寫該方法
     * @return
     */
    protected boolean useConcurrentLock() {
        return false;
    }

    /**
     * 根所注解獲取LockKey,可被子類重寫,提高效率
     *
     * @param businessObj   業務對象
     * @return concurrent lock key
     */
    protected String getLockKey( T businessObj) {
        StringBuilder lockKey = new StringBuilder(EASYJOB_SINGLE_TASK_LOCK_PREFIX);
        //若存在注解指定的防重字段,則使用這些字段拼裝防重Key,否則使用MQ業務主鍵防重
        List valueEntries = getAnnotaionConcurrentKeys(businessObj);
        if (!CollectionUtils.isEmpty(valueEntries)) {
            for (ValueEntryInfo valueEntry : valueEntries) {
                lockKey.append(Constants.SEPARATOR_UNDERLINE);
                lockKey.append(valueEntry.getValue());
            }
        } else {
           throw new CommonBusinessException(String.format("此任務處理需要加分布式鎖,但是未設置鎖key,所以不做業務處理,請檢查,任務信息:%s",JSON.toJSONString(businessObj)));
        }
        return lockKey.toString();
    }

    /**
     * 查找對象的ConccurentKey注解,獲取防重字段,并排序返回
     *
     * @param businessObj 業務對象
     * @return 有序的業務字段值列表
     */
    private List getAnnotaionConcurrentKeys(T businessObj) {
        List valueEntries = new ArrayList();
        Field[] fields = businessObj.getClass().getDeclaredFields();
        for (int i = 0; i < fields.length; i++) {
            ConcurrentKey concurrentKey = fields[i].getAnnotation(ConcurrentKey.class);
            if (concurrentKey != null) {
                fields[i].setAccessible(true);
                Object fieldVal = null;
                try {
                    ValueEntryInfo valueEntry = new ValueEntryInfo();
                    fieldVal = fields[i].get(businessObj);
                    if (fieldVal != null) {
                        valueEntry.setValue(String.format("%1$s", fieldVal));
                        valueEntry.setOrder(concurrentKey.order());
                        valueEntries.add(valueEntry);
                    }
                } catch (IllegalAccessException e) {
                    log.error("IllegalAccess-{}.{}", businessObj.getClass().getName(), fields[i].getName());
                }
            }
        }
        if (valueEntries.size() > 1) {
            //排序ConcurrentKey
            Collections.sort(valueEntries, new Comparator() {
                @Override
                public int compare(ValueEntryInfo o1, ValueEntryInfo o2) {
                    if (o1.getOrder() > o2.getOrder()) {
                        return 1;
                    } else if (o1.getOrder() == o2.getOrder()) {
                        return 0;
                    } else {
                        return -1;
                    }
                }
            });
        }
        return valueEntries;
    }

    protected List selectTasks(TaskServerParam taskServerParam, int curServer) {
        return this.loadTasks(taskServerParam, curServer);
    }

    /**
     * 獲取select時的任務創建開始時間
     * @param serverArg
     * @return
     */
    protected Date getCreateTimeFrom(String serverArg){
        return null;
    }

    /**
     * 是否以批量方式處理任務
     * @return
     */
    protected boolean isDoBatchTasks(){
        return false;
    }

}

實戰結果

上述所述均為透傳ID場景的原理和示例代碼,實戰效果如下圖:調用jsf超時,跨系統查看日志進行排查,得知為慢sql引起

上述大部分場景已經抽出一個通用jar包,詳細使用教程見我的另一篇文章:分布式日志追蹤ID使用教程

審核編輯 黃宇

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 分布式
    +關注

    關注

    1

    文章

    923

    瀏覽量

    74608
  • JSF
    JSF
    +關注

    關注

    0

    文章

    12

    瀏覽量

    7763
  • 過濾器
    +關注

    關注

    1

    文章

    432

    瀏覽量

    19719
收藏 人收藏

    評論

    相關推薦

    HarmonyOS實戰案例:【分布式賬本】

    Demo基于Open Harmony系統使用ETS語言進行編寫,本Demo主要通過設備認證、分布式拉起、分布式數據管理等功能來實現。
    的頭像 發表于 04-12 16:40 ?1389次閱讀
    HarmonyOS<b class='flag-5'>實戰</b>案例:【<b class='flag-5'>分布式</b>賬本】

    分布式軟件系統

    分布式軟件系統分布式軟件系統(Distributed Software Systems)是支持分布式處理的軟件系統,是在由通信網絡互聯的多處理機體系結構上執行任務的系統。它包括分布式
    發表于 07-22 14:53

    開放分布式追蹤(OpenTracing)入門與 Jaeger 實現

    改動。OpenTracing為了解決不同的分布式追蹤系統 API 不兼容的問題,誕生了 OpenTracing 規范。OpenTracing 是一個輕量級的標準化層,它位于應用程序/類庫和追蹤
    發表于 03-07 16:27

    基于分布式調用鏈監控技術的全息排查功能

    kevin.yang的鏈路信息,查詢示例如下:同時,ARMS還開放具體的全息排查事件查詢,方便用戶直接查看日志原始信息。如查詢username為kevin.yang的日志信息,查詢示例如下:結語作為國內分布式鏈路
    發表于 08-07 17:02

    分布式系統的優勢是什么?

    當討論分布式系統時,我們面臨許多以下這些形容詞所描述的 同類型: 分布式的、刪絡的、并行的、并發的和分散的。分布式處理是一個相對較新的領域,所以還沒有‘致的定義。與順序計算相比、并行的、并發的和
    發表于 03-31 09:01

    Hello HarmonyOS學習筆記:分布式新聞客戶端實戰(JS、eTS)

    源代碼下載地址:Codelabs: 分享知識與見解,一起探索HarmonyOS的獨特魅力。 - Gitee.com代碼講解視頻:華為開發者學堂-【Hello系列直播課】第5期:分布式新聞客戶端實戰
    發表于 06-23 20:08

    如何使用Jmeter進行分布式測試;檢索日志

    使用 Jmeter 進行分布式測試;檢索日志
    發表于 05-10 13:00

    網絡取證日志分布式安全管理

    提出了一種網絡取證日志分布式安全管理方法,通過日志代理和管理網關將分散的異構的日志收集并存儲到多個管理節點。該管理節點采用信息分配算法IDA將日志
    發表于 05-11 20:12 ?10次下載

    HarmonyOS測試技術與實戰-HarmonyOS分布式應用特征與挑戰

     HDC 2021華為開發者大會HarmonyOS測試技術與實戰-HarmonyOS分布式應用特征與挑戰
    的頭像 發表于 10-23 14:41 ?1726次閱讀
    HarmonyOS測試技術與<b class='flag-5'>實戰</b>-HarmonyOS<b class='flag-5'>分布式</b>應用特征與挑戰

    HarmonyOS測試技術與實戰-分布式應用測試解決方案

    HDC 2021華為開發者大會HarmonyOS測試技術與實戰-HarmonyOS分布式應用測試解決方案
    的頭像 發表于 10-23 14:48 ?1634次閱讀
    HarmonyOS測試技術與<b class='flag-5'>實戰</b>-<b class='flag-5'>分布式</b>應用測試解決方案

    HarmonyOS測試技術與實戰-分布式UI測試框架

    HDC 2021華為開發者大會 HarmonyOS測試技術與實戰-分布式UI測試框架演示
    的頭像 發表于 10-23 14:49 ?1417次閱讀
    HarmonyOS測試技術與<b class='flag-5'>實戰</b>-<b class='flag-5'>分布式</b>UI測試框架

    HarmonyOS測試技術與實戰-分布式業務音頻體驗關注點

    HDC 2021華為開發者大會 HarmonyOS測試技術與實戰-分布式業務音頻體驗關注點
    的頭像 發表于 10-23 15:59 ?1227次閱讀
    HarmonyOS測試技術與<b class='flag-5'>實戰</b>-<b class='flag-5'>分布式</b>業務音頻體驗關注點

    HarmonyOS測試技術與實戰-分布式業務音視頻體驗測試全景

    HDC 2021華為開發者大會 HarmonyOS測試技術與實戰-分布式業務音視頻體驗測試全景
    的頭像 發表于 10-23 16:03 ?1809次閱讀
    HarmonyOS測試技術與<b class='flag-5'>實戰</b>-<b class='flag-5'>分布式</b>業務音視頻體驗測試全景

    為什么需要分布式ID?求一種分布式ID生成方案

    對于單體系統來說,主鍵ID可能會常用主鍵自動的方式進行設置,這種ID生成方法在單體項目是可行的,但是對于分布式系統,分庫分表之后,就不適應了
    的頭像 發表于 01-09 10:43 ?1258次閱讀

    讓你的Nginx支持分布式追蹤OpenTracing

    換句話說,我們可以說分布式追蹤是對跨多個系統的多個請求的拼接。拼接通常由一個或多個相關 ID 完成,并且跟蹤通常是一組記錄的、跨所有系統的結構化日志事件,存儲在一個中心位置。
    的頭像 發表于 03-08 09:54 ?795次閱讀
    主站蜘蛛池模板: 日韩一区二区在线免费观看 | 亚洲欧美一区二区三区久久 | 又大又硬又爽免费视频 | 国产精品A8198V久久A片 | 亚洲黄色录像片 | 夜里18款禁用的免费B站动漫 | 女人被躁到高潮嗷嗷叫免费 | 中文字幕爆乳JULIA女教师 | 国产精品外围在线观看 | 无码欧美毛片一区二区三在线视频 | 无修肉动漫在线观看影片 | 99国产这里只有精品视频 | 手机在线看片欧美亚洲 | 偷柏自拍亚洲综合在线 | 97精品伊人久久大香线蕉app | 日本高清免费一本视频在线观看 | 最新国产麻豆精品 | 日产国产欧美韩国在线 | 日本午夜精品理论片A级APP发布 | 亚洲福利天堂网福利在线观看 | 欧美双拳极限扩张 | 亚洲 日韩 在线 国产 精品 | 草莓湿漉漉是好事还是恶性 | 性一交一无一伦一精一品 | 国产精品日本无码久久一老A | qvod小电影 | 动漫女生的逼 | 欧洲内射XXX高清 | 国产51麻豆二区精品AV视频 | 99久久久免费精品国产 | 国产亚洲综合视频 | 成人人观看的免费毛片 | 久久香蕉国产线看观看首页 | 边做边爱免费视频播放 | 在线观看亚洲专区5555 | 91亚洲精品 | 男男高H啪肉Np文多攻多一受 | 无码人妻丰满熟妇区五十路久久 | 国产AV无码成人黄网站免费 | 欧美性爱 先锋影音 | 国产精品久久久久久熟妇吹潮软件 |