Kettle系列教程-第十二章:使用Java执行Kettle作业

Author Avatar
山小杰 7月 30, 2018
  • 在其它设备中阅读本文章

本系列教程基于Kettle 8.1(pdi-ce-8.1.0.0-365)。大部分内容同样适用于Kettle 7.x版本。
章节目录:

本章说明

Kettle是提供了一些api的,我们可以通过这些api去执行Kettle作业、转换。除了执行作业Kettle还有其他很多api可供使用,本章只介绍作业的执行,其他api有兴趣的可以去探索探索。

搭建Kettle运行环境

首先需要搭建一个Kettle运行环境,很简单,就是从data-integration\lib\目录下复制部分核心jar包出来,导入到java项目(jdk1.8)中,本章节所需jar包如下(不要忘了数据库连接驱动):
img

代码示例(作业、转换、资源库)

这里就直接放代码了,包含作业文件、转换文件、资源库作业的执行示例。

package com.staroon.kettle.exec;

import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import java.util.HashMap;
import java.util.Map;

public class RunKettleJob {
    public static void main(String[] args) {

        // 参数列表
        Map<String, String> params = new HashMap<>();
        params.put("filename", "runjob");
        params.put("extend", "txt");

        String kjbPath = "D:/kettle/jobs/Kettledoc/常规作业示例.kjb";
        String ktrPath = "D:/kettle/jobs/Kettledoc/转换任务.ktr";
//        runJob(params, kjbPath);
//        runTrans(params, ktrPath);
        runRepoJob(params);
    }

    /**
     * 运行资源库中的作业
     *
     * @param params 作业参数
     */
    public static void runRepoJob(Map<String, String> params) {
        try {
            KettleEnvironment.init();
            KettleDatabaseRepository repository = new KettleDatabaseRepository();
            // 配置资源库数据库连接信息
            DatabaseMeta databaseMeta = new DatabaseMeta(
                    "kettle",
                    "mysql",
                    "jdbc",
                    "127.0.0.1",
                    "kettle",
                    "3308",
                    "root",
                    "lwsjfwq"
            );

            // 配置连接参数,指定连接编码为UTF8,若不指定则不能读取中文目录或者中文名作业
            databaseMeta.getAttributes().put("EXTRA_OPTION_MYSQL.characterEncoding", "utf8");

            // 连接测试
            if (databaseMeta.testConnection().startsWith("正确")) {
                System.out.println("数据库连接成功");
            } else {
                System.out.println("数据库连接失败");
                return;
            }

            // 配置资源库
            KettleDatabaseRepositoryMeta repositoryMeta = new KettleDatabaseRepositoryMeta(
                    "kettle",
                    "kettle",
                    "Kettle Repository",
                    databaseMeta
            );
            repository.init(repositoryMeta);
            // 连接资源库
            repository.connect("admin", "admin");

            // 指定job或者trans所在的目录
            RepositoryDirectoryInterface dir = repository.findDirectory("/批处理/");

            // 选择资源库中的作业
            JobMeta jobMeta = repository.loadJob("资源库作业示例", dir, null, null);

            // 配置作业参数
            for (String param : params.keySet()) {
                jobMeta.setParameterValue(param, params.get(param));
            }

            Job job = new Job(repository, jobMeta);
            job.setLogLevel(LogLevel.DEBUG);
            //执行作业
            job.start();
            //等待作业执行结束
            job.waitUntilFinished();
            if (job.getErrors() > 0) {
                throw new Exception("作业执行出错");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 运行转换文件
     *
     * @param params  转换参数
     * @param ktrPath 转换文件的路径,后缀ktr
     */
    public static void runTrans(Map<String, String> params, String ktrPath) {
        try {
            // 初始化
            KettleEnvironment.init();
            EnvUtil.environmentInit();
            TransMeta transMeta = new TransMeta(ktrPath);

            // 配置参数
            for (String param : params.keySet()) {
                transMeta.setParameterValue(param, params.get(param));
            }

            Trans trans = new Trans(transMeta);

            // 设置日志级别
            trans.setLogLevel(LogLevel.DEBUG);

            // 执行转换
            trans.execute(null);
            // 等待转换执行结束
            trans.waitUntilFinished();
            // 抛出异常
            if (trans.getErrors() > 0) {
                throw new Exception("转换执行出错");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 运行作业文件
     *
     * @param params  作业参数
     * @param kjbPath 作业文件路径,后缀kjb
     */
    public static void runJob(Map<String, String> params, String kjbPath) {
        try {
            KettleEnvironment.init();
            JobMeta jobMeta = new JobMeta(kjbPath, null);

            // 配置作业参数
            for (String param : params.keySet()) {
                jobMeta.setParameterValue(param, params.get(param));
            }
            // 配置变量
            // jobMeta.setVariable("name","value");

            Job job = new Job(null, jobMeta);

            // 设置日志级别
            job.setLogLevel(LogLevel.DEBUG);

            // 启动作业
            job.start();
            // 等待作业执行完毕
            job.waitUntilFinished();
            if (job.getErrors() > 0) {
                throw new Exception("作业执行出错");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

本章完!