美文网首页
聊聊PowerJob的Alarmable

聊聊PowerJob的Alarmable

作者: go4it | 来源:发表于2024-01-13 21:56 被阅读0次

    本文主要研究一下PowerJob的Alarmable

    Alarmable

    tech/powerjob/server/extension/Alarmable.java

    public interface Alarmable {
    
        void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
    }
    

    Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList

    Alarm

    public interface Alarm extends PowerSerializable {
    
        String fetchTitle();
    
        default String fetchContent() {
            StringBuilder sb = new StringBuilder();
            JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this));
            content.forEach((key, originWord) -> {
                sb.append(key).append(": ");
                String word = String.valueOf(originWord);
                if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) {
                    try {
                        if (originWord instanceof Long) {
                            word = CommonUtils.formatTime((Long) originWord);
                        }
                    }catch (Exception ignore) {
                    }
                }
                sb.append(word).append(OmsConstant.LINE_SEPARATOR);
            });
            return sb.toString();
        }
    }
    

    Alarm定义了fetchTitle方法,提供了fetchContent默认方法,它有两个实现类分别是JobInstanceAlarm、WorkflowInstanceAlarm

    DingTalkAlarmService

    tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java

    @Slf4j
    @Service
    @RequiredArgsConstructor
    public class DingTalkAlarmService implements Alarmable {
    
        private final Environment environment;
    
        private Long agentId;
        private DingTalkUtils dingTalkUtils;
        private Cache<String, String> mobile2UserIdCache;
    
        private static final int CACHE_SIZE = 8192;
        /**
         * 防止缓存击穿
         */
        private static final String EMPTY_TAG = "EMPTY";
    
        @Override
        public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
            if (dingTalkUtils == null) {
                return;
            }
            Set<String> userIds = Sets.newHashSet();
            targetUserList.forEach(user -> {
                String phone = user.getPhone();
                if (StringUtils.isEmpty(phone)) {
                    return;
                }
                try {
                    String userId = mobile2UserIdCache.get(phone, () -> {
                        try {
                            return dingTalkUtils.fetchUserIdByMobile(phone);
                        } catch (PowerJobException ignore) {
                            return EMPTY_TAG;
                        } catch (Exception ignore) {
                            return null;
                        }
                    });
                    if (!EMPTY_TAG.equals(userId)) {
                        userIds .add(userId);
                    }
                }catch (Exception ignore) {
                }
            });
            userIds.remove(null);
    
            if (!userIds.isEmpty()) {
                String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds);
                List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList();
                markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost()));
                String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA);
                markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content));
    
                try {
                    dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId);
                }catch (Exception e) {
                    log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage());
                }
            }
        }
    
        @PostConstruct
        public void init() {
            String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID);
            String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY);
            String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET);
    
            log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId);
    
            if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) {
                log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable");
                return;
            }
            if (!StringUtils.isNumeric(agentId)) {
                log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId);
                return;
            }
            this.agentId = Long.valueOf(agentId);
            dingTalkUtils = new DingTalkUtils(appKey, appSecret);
            mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).softValues().build();
            log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!");
        }
    
    }
    

    DingTalkAlarmService实现了Alarmable接口,其onFailed遍历targetUserList获取userId,最后通过dingTalkUtils.sendMarkdownAsync发送

    MailAlarmService

    tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java

    @Slf4j
    @Service
    public class MailAlarmService implements Alarmable {
    
        @Resource
        private Environment environment;
    
        private JavaMailSender javaMailSender;
    
        @Value("${spring.mail.username:''}")
        private String from;
    
        @Override
        public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
            if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
                return;
            }
    
            SimpleMailMessage sm = new SimpleMailMessage();
            try {
                sm.setFrom(from);
                sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new));
                sm.setSubject(alarm.fetchTitle());
                sm.setText(alarm.fetchContent());
    
                javaMailSender.send(sm);
            }catch (Exception e) {
                log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage());
            }
        }
    
        @Autowired(required = false)
        public void setJavaMailSender(JavaMailSender javaMailSender) {
            this.javaMailSender = javaMailSender;
        }
    
    }
    

    MailAlarmService实现了Alarmable接口,其onFailed方法构建SimpleMailMessage,然后通过spring的javaMailSender.send发送

    WebHookAlarmService

    tech/powerjob/server/extension/defaultimpl/alarm/impl/WebHookAlarmService.java

    @Slf4j
    @Service
    public class WebHookAlarmService implements Alarmable {
    
        private static final String HTTP_PROTOCOL_PREFIX = "http://";
        private static final String HTTPS_PROTOCOL_PREFIX = "https://";
    
        @Override
        public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
            if (CollectionUtils.isEmpty(targetUserList)) {
                return;
            }
            targetUserList.forEach(user -> {
                String webHook = user.getWebHook();
                if (StringUtils.isEmpty(webHook)) {
                    return;
                }
    
                // 自动添加协议头
                if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) {
                    webHook = HTTP_PROTOCOL_PREFIX + webHook;
                }
    
                MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
                RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm));
    
                try {
                    String response = HttpUtils.post(webHook, requestBody);
                    log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response);
                }catch (Exception e) {
                    log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e);
                }
            });
        }
    }
    

    WebHookAlarmService实现了Alarmable接口,其onFailed方法遍历targetUserList,挨个执行HttpUtils.post(webHook, requestBody),用的是okhttp3来实现http请求回调

    小结

    PowerJob的Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList;它有三个实现类,分别是DingTalkAlarmService(用的是DingTalkClient)、MailAlarmService(用的是spring的JavaMailSender)、WebHookAlarmService(用的是okhttp3的OkHttpClient)。

    相关文章

      网友评论

          本文标题:聊聊PowerJob的Alarmable

          本文链接:https://www.haomeiwen.com/subject/qnrqodtx.html