美文网首页
开源定时任务power-job的告警实现

开源定时任务power-job的告警实现

作者: 天草二十六_简村人 | 来源:发表于2022-12-15 23:59 被阅读0次

    一、报警接口Alarmable

    package tech.powerjob.server.extension;
    
    import tech.powerjob.server.persistence.remote.model.UserInfoDO;
    import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
    
    import java.util.List;
    
    /**
     * 报警接口
     *
     * @author tjq
     * @since 2020/4/19
     */
    public interface Alarmable {
    
        void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
    }
    

    二、告警实现类

    多种实现,这里实现了钉钉消息、邮件和webhook三种方式。你可以继续实现企业微信、sms等其他方式的告警。

    DingTalkAlarmService

    依赖于DingTalkUtils,详细我将在另外一篇文章里列出,希望能帮助到对接钉钉的同学。

    package tech.powerjob.server.extension.defaultimpl.alarm.impl;
    
    import tech.powerjob.common.OmsConstant;
    import tech.powerjob.common.exception.PowerJobException;
    import tech.powerjob.common.utils.NetUtils;
    import tech.powerjob.server.common.PowerJobServerConfigKey;
    import tech.powerjob.server.common.SJ;
    import tech.powerjob.server.persistence.remote.model.UserInfoDO;
    import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
    import tech.powerjob.server.extension.Alarmable;
    import com.google.common.cache.Cache;
    import com.google.common.cache.CacheBuilder;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Sets;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.core.env.Environment;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.Resource;
    import java.util.List;
    import java.util.Set;
    
    /**
     * 钉钉告警服务
     *
     * @author tjq
     * @since 2020/8/6
     */
    @Slf4j
    @Service
    public class DingTalkAlarmService implements Alarmable {
    
        @Resource
        private Environment environment;
    
        private Long agentId;
        private DingTalkUtils dingTalkUtils;
        private Cache<String, String> mobile2UserIdCache;
    
        private static final int CACHE_SIZE = 8192;
        // 防止缓存击穿
        private static final String EMPTY_TAG = "EMPTY";
    
        @Override
        public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
            if (dingTalkUtils == null) {
                return;
            }
            Set<String> userIds = Sets.newHashSet();
            targetUserList.forEach(user -> {
                String phone = user.getPhone();
                if (StringUtils.isEmpty(phone)) {
                    return;
                }
                try {
                    String userId = mobile2UserIdCache.get(phone, () -> {
                        try {
                            return dingTalkUtils.fetchUserIdByMobile(phone);
                        } catch (PowerJobException ignore) {
                            return EMPTY_TAG;
                        } catch (Exception ignore) {
                            return null;
                        }
                    });
                    if (!EMPTY_TAG.equals(userId)) {
                        userIds .add(userId);
                    }
                }catch (Exception ignore) {
                }
            });
            userIds.remove(null);
    
            if (!userIds.isEmpty()) {
                String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds);
                List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList();
                markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost()));
                String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA);
                markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content));
    
                try {
                    dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId);
                }catch (Exception e) {
                    log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage());
                }
            }
        }
    
        @PostConstruct
        public void init() {
            String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID);
            String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY);
            String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET);
    
            log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId);
    
            if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) {
                log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable");
                return;
            }
            if (!StringUtils.isNumeric(agentId)) {
                log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId);
                return;
            }
            this.agentId = Long.valueOf(agentId);
            dingTalkUtils = new DingTalkUtils(appKey, appSecret);
            mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
            log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!");
        }
    
    }
    
    

    MailAlarmService

    邮件发送主要依赖javamail,没有太多可讲的。

    package tech.powerjob.server.extension.defaultimpl.alarm.impl;
    
    import tech.powerjob.server.persistence.remote.model.UserInfoDO;
    import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
    import tech.powerjob.server.extension.Alarmable;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.core.env.Environment;
    import org.springframework.mail.SimpleMailMessage;
    import org.springframework.mail.javamail.JavaMailSender;
    import org.springframework.stereotype.Service;
    import org.springframework.util.CollectionUtils;
    import org.springframework.util.StringUtils;
    
    import javax.annotation.Resource;
    import java.util.List;
    import java.util.Objects;
    
    /**
     * 邮件通知服务
     *
     * @author tjq
     * @since 2020/4/30
     */
    @Slf4j
    @Service
    public class MailAlarmService implements Alarmable {
    
        @Resource
        private Environment environment;
    
        private JavaMailSender javaMailSender;
    
        private String from;
        private static final String FROM_KEY = "spring.mail.username";
    
        @Override
        public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
            initFrom();
            if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
                return;
            }
    
            SimpleMailMessage sm = new SimpleMailMessage();
            try {
                sm.setFrom(from);
                sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new));
                sm.setSubject(alarm.fetchTitle());
                sm.setText(alarm.fetchContent());
    
                javaMailSender.send(sm);
            }catch (Exception e) {
                log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage());
            }
        }
    
        @Autowired(required = false)
        public void setJavaMailSender(JavaMailSender javaMailSender) {
            this.javaMailSender = javaMailSender;
        }
    
        // 不能直接使用 @Value 注入,不存在的时候会报错
        private void initFrom() {
            if (StringUtils.isEmpty(from)) {
                from = environment.getProperty(FROM_KEY);
            }
        }
    }
    
    

    WebHookAlarmService

    发送http请求是okhttp3框架,我也将在另外一篇单独列出,希望可以帮助到相关同学。当然,发送http请求的框架太多了,你可以任意选择。

    package tech.powerjob.server.extension.defaultimpl.alarm.impl;
    
    import com.alibaba.fastjson.JSONObject;
    import tech.powerjob.common.OmsConstant;
    import tech.powerjob.common.utils.HttpUtils;
    import tech.powerjob.server.persistence.remote.model.UserInfoDO;
    import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
    import tech.powerjob.server.extension.Alarmable;
    import lombok.extern.slf4j.Slf4j;
    import okhttp3.MediaType;
    import okhttp3.RequestBody;
    import org.springframework.stereotype.Service;
    import org.springframework.util.CollectionUtils;
    import org.springframework.util.StringUtils;
    
    import java.util.List;
    
    /**
     * http 回调报警
     *
     * @author tjq
     * @since 11/14/20
     */
    @Slf4j
    @Service
    public class WebHookAlarmService implements Alarmable {
    
        private static final String HTTP_PROTOCOL_PREFIX = "http://";
        private static final String HTTPS_PROTOCOL_PREFIX = "https://";
    
        @Override
        public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
            if (CollectionUtils.isEmpty(targetUserList)) {
                return;
            }
            targetUserList.forEach(user -> {
                String webHook = user.getWebHook();
                if (StringUtils.isEmpty(webHook)) {
                    return;
                }
    
                // 自动添加协议头
                if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) {
                    webHook = HTTP_PROTOCOL_PREFIX + webHook;
                }
    
                MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
                RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm));
    
                try {
                    String response = HttpUtils.post(webHook, requestBody);
                    log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response);
                }catch (Exception e) {
                    log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e);
                }
            });
        }
    }
    
    

    三、消息基类Alarm

    是一个接口,默认实现了拼接消息内容的方法。不同种类的消息,包含的字段不一样,但是都实现该接口。

    package tech.powerjob.server.extension.defaultimpl.alarm.module;
    
    import com.alibaba.fastjson.JSONObject;
    import tech.powerjob.common.OmsConstant;
    import tech.powerjob.common.PowerSerializable;
    import tech.powerjob.common.utils.CommonUtils;
    import org.apache.commons.lang3.StringUtils;
    
    /**
     * 报警内容
     *
     * @author tjq
     * @since 2020/8/1
     */
    public interface Alarm extends PowerSerializable {
    
        String fetchTitle();
    
        default String fetchContent() {
            StringBuilder sb = new StringBuilder();
            JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this));
            content.forEach((key, originWord) -> {
                sb.append(key).append(": ");
                String word = String.valueOf(originWord);
                if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) {
                    try {
                        if (originWord instanceof Long) {
                            word = CommonUtils.formatTime((Long) originWord);
                        }
                    }catch (Exception ignore) {
                    }
                }
                sb.append(word).append(OmsConstant.LINE_SEPARATOR);
            });
            return sb.toString();
        }
    }
    
    

    四、告警服务

    对外提供的功能入口,供其他代码调用。主要是方法alarmFailed()。
    它的入参,第一个是消息内容,只要实现了接口Alerm的对象即可,第二个是告警对象-消息接收人。

    image.png
    package tech.powerjob.server.extension.defaultimpl.alarm;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
    import tech.powerjob.server.extension.Alarmable;
    import tech.powerjob.server.persistence.remote.model.UserInfoDO;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Queues;
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.List;
    import java.util.concurrent.*;
    
    /**
     * 报警服务
     *
     * @author tjq
     * @since 2020/4/19
     */
    @Slf4j
    @Component
    public class AlarmCenter {
    
        private final ExecutorService POOL;
        private final List<Alarmable> BEANS = Lists.newLinkedList();
    
        @Autowired
        public AlarmCenter(List<Alarmable> alarmables) {
            int cores = Runtime.getRuntime().availableProcessors();
            ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("AlarmPool-%d").build();
            POOL = new ThreadPoolExecutor(cores, cores, 5, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue(), factory);
    
            alarmables.forEach(bean -> {
                BEANS.add(bean);
                log.info("[AlarmCenter] bean(className={},obj={}) register to AlarmCenter successfully!", bean.getClass().getName(), bean);
            });
        }
    
        public void alarmFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
            POOL.execute(() -> BEANS.forEach(alarmable -> {
                try {
                    alarmable.onFailed(alarm, targetUserList);
                }catch (Exception e) {
                    log.warn("[AlarmCenter] alarm failed.", e);
                }
            }));
        }
    }
    
    

    告警对象

    不同的告警方式,需要填写的字段不一,可能是手机号,可能是邮箱,也可以是webhook地址。

    package tech.powerjob.server.persistence.remote.model;
    
    import lombok.Data;
    import org.hibernate.annotations.GenericGenerator;
    
    import javax.persistence.*;
    import java.util.Date;
    
    /**
     * 用户信息表
     *
     * @author tjq
     * @since 2020/4/12
     */
    @Data
    @Entity
    @Table
    public class UserInfoDO {
    
        @Id
        @GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
        @GenericGenerator(name = "native", strategy = "native")
        private Long id;
    
        private String username;
    
        private String password;
        /**
         * 手机号
         */
        private String phone;
        /**
         * 邮箱地址
         */
        private String email;
        /**
         * webHook
         */
        private String webHook;
        /**
         * 扩展字段
         */
        private String extra;
    
        private Date gmtCreate;
    
        private Date gmtModified;
    }
    
    

    相关文章

      网友评论

          本文标题:开源定时任务power-job的告警实现

          本文链接:https://www.haomeiwen.com/subject/ueldqdtx.html