美文网首页
聊聊PowerJob的AbstractScriptProcess

聊聊PowerJob的AbstractScriptProcess

作者: go4it | 来源:发表于2024-01-03 09:13 被阅读0次

    本文主要研究一下PowerJob的AbstractScriptProcessor

    AbstractScriptProcessor

    tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java

    @Slf4j
    public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
    
        private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors());
        private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
        protected static final String SH_SHELL = "/bin/sh";
        protected static final String CMD_SHELL = "cmd.exe";
    
        private static final String WORKER_DIR = PowerFileUtils.workspace() + "/official_script_processor/";
    
        @Override
        protected ProcessResult process0(TaskContext context) throws Exception {
            OmsLogger omsLogger = context.getOmsLogger();
            String scriptParams = CommonUtils.parseParams(context);
            omsLogger.info("[SYSTEM] ScriptProcessor start to process, params: {}", scriptParams);
            if (scriptParams == null) {
                String message = "[SYSTEM] ScriptParams is null, please check jobParam configuration.";
                omsLogger.warn(message);
                return new ProcessResult(false, message);
            }
            String scriptPath = prepareScriptFile(context.getInstanceId(), scriptParams);
            omsLogger.info("[SYSTEM] Generate executable file successfully, path: {}", scriptPath);
            
            if (SystemUtils.IS_OS_WINDOWS) {
                if (StringUtils.equals(getRunCommand(), SH_SHELL)) {
                    String message = String.format("[SYSTEM] Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME);
                    omsLogger.warn(message);
                    return new ProcessResult(false, message);
                }
            }
    
            // 授权
            if  ( !SystemUtils.IS_OS_WINDOWS) {
                ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
                // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁)
                chmodPb.start().waitFor();
                omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~");
            }
            // 2. 执行目标脚本
            ProcessBuilder pb = StringUtils.equals(getRunCommand(), CMD_SHELL) ?
                    new ProcessBuilder(getRunCommand(), "/c", scriptPath)
                    : new ProcessBuilder(getRunCommand(), scriptPath);
            Process process = pb.start();
    
            StringBuilder inputBuilder = new StringBuilder();
            StringBuilder errorBuilder = new StringBuilder();
    
            boolean success = true;
            String result;
    
            final Charset charset = getCharset();
            try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) {
    
                POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset));
                POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset));
    
                success = process.waitFor() == 0;
    
            } catch (InterruptedException ie) {
                omsLogger.info("[SYSTEM] ScriptProcessor has been interrupted");
            } finally {
                result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString());
            }
            return new ProcessResult(success, result);
        }
    
        /**
         * 生成脚本名称
         * @param instanceId id of instance
         * @return 文件名称
         */
        protected abstract String getScriptName(Long instanceId);
    
        /**
         * 获取运行命令(eg,shell返回 /bin/sh)
         * @return 执行脚本的命令
         */
        protected abstract String getRunCommand();
    
        //......
    }    
    

    AbstractScriptProcessor继承了CommonBasicProcessor,它定义了一个parallelism为4*Runtime.getRuntime().availableProcessors()的ForkJoinPool;其process0方法先读取scriptParams,然后执行prepareScriptFile获取scriptPath,接着使用chmod变更script权限为755,然后通过getRunCommand获取命令,接着往pool提交copyStream,等待process返回

    prepareScriptFile

        private String prepareScriptFile(Long instanceId, String processorInfo) throws IOException {
            String scriptPath = WORKER_DIR + getScriptName(instanceId);
            File script = new File(scriptPath);
            if (script.exists()) {
                return scriptPath;
            }
            File dir = new File(script.getParent());
            boolean success = dir.mkdirs();
            success = script.createNewFile();
            if (!success) {
                throw new RuntimeException("create script file failed");
            }
    
            // 如果是下载链接,则从网络获取
            for (String protocol : DOWNLOAD_PROTOCOL) {
                if (processorInfo.startsWith(protocol)) {
                    FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000);
                    return scriptPath;
                }
            }
    
            final Charset charset = getCharset();
    
            if(charset != null)
            {
                try (Writer fstream = new OutputStreamWriter(Files.newOutputStream(script.toPath()), charset); BufferedWriter out = new BufferedWriter(fstream)) {
                    out.write(processorInfo);
                    out.flush();
                }
            }
            else {
                try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
                    bw.write(processorInfo);
                    bw.flush();
                }
            }
            return scriptPath;
        }
    

    prepareScriptFile先通过getScriptName获取scriptPath,如果是http、https、ftp链接则通过FileUtils.copyURLToFile下载,否则把scriptParams写入到scriptPath

    copyStream

        private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) {
            String line;
            try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) {
                while ((line = br.readLine()) != null) {
                    sb.append(line);
                    // 同步到在线日志
                    omsLogger.info(line);
                }
            } catch (Exception e) {
                log.warn("[ScriptProcessor] copyStream failed.", e);
                omsLogger.warn("[SYSTEM] copyStream failed.", e);
    
                sb.append("Exception: ").append(e);
            }
        }
    

    copyStream会读取InputStream到StringBuilder,同时打印到omsLogger

    ShellProcessor

    tech/powerjob/official/processors/impl/script/ShellProcessor.java

    public class ShellProcessor extends AbstractScriptProcessor {
    
        @Override
        protected String getScriptName(Long instanceId) {
            return String.format("shell_%d.sh", instanceId);
        }
    
        @Override
        protected String getRunCommand() {
            return SH_SHELL;
        }
    }
    

    ShellProcessor的getScriptName是基于shell_%d.sh和instanceId生成的;其getRunCommand为/bin/sh

    CMDProcessor

    tech/powerjob/official/processors/impl/script/CMDProcessor.java

    public class CMDProcessor extends AbstractScriptProcessor {
    
        @Override
        protected String getScriptName(Long instanceId) {
            return String.format("cmd_%d.bat", instanceId);
        }
    
        @Override
        protected String getRunCommand() {
            return "cmd.exe";
        }
    
        @Override
        protected Charset getCharset() {
            return Charset.defaultCharset();
        }
    }
    

    CMDProcessor的getScriptName是基于cmd_%d.bat和instanceId生成,其getRunCommand为`cmd.exe

    PowerShellProcessor

    tech/powerjob/official/processors/impl/script/PowerShellProcessor.java

    public class PowerShellProcessor extends AbstractScriptProcessor {
    
        @Override
        protected String getScriptName(Long instanceId) {
            return String.format("powershell_%d.ps1", instanceId);
        }
    
        @Override
        protected String getRunCommand() {
            return "powershell.exe";
        }
    
        @Override
        protected Charset getCharset() {
            return Charset.defaultCharset();
        }
    }
    

    PowerShellProcessor的getScriptName是基于powershell_%d.ps1"和instanceId生成,其getRunCommand为powershell.exe

    PythonProcessor

    tech/powerjob/official/processors/impl/script/PythonProcessor.java

    public class PythonProcessor extends AbstractScriptProcessor {
    
        @Override
        protected String getScriptName(Long instanceId) {
            return String.format("python_%d.py", instanceId);
        }
    
        @Override
        protected String getRunCommand() {
            return "python";
        }
    }
    

    PythonProcessor的getScriptName是基于python_%d.py和instanceId生成,其getRunCommand为python

    小结

    AbstractScriptProcessor继承了CommonBasicProcessor,它有四个实现类分别是ShellProcessor、CMDProcessor、PowerShellProcessor、PythonProcessor;它定义了getScriptName、getRunCommand抽象方法;其process0方法主要是把scriptParams写入到本地文件(scriptParams是http、https、ftp的则根据url进行下载),然后修改权限为755,然后执行pb.start(),再将input及errorStream收集到StringBuilder并打印到omsLogger,最后process.waitFor()等待处理完成。

    相关文章

      网友评论

          本文标题:聊聊PowerJob的AbstractScriptProcess

          本文链接:https://www.haomeiwen.com/subject/ijbmndtx.html