hive udf 使用方法

2019年4月8日 3584点热度 0人点赞 0条评论

Hive系列文章

  1. Hive表的基本操作
  2. Hive中的集合数据类型
  3. Hive动态分区详解
  4. hive中orc格式表的数据导入
  5. Java通过jdbc连接hive
  6. 通过HiveServer2访问Hive
  7. SpringBoot连接Hive实现自助取数
  8. hive关联hbase表
  9. Hive udf 使用方法
  10. Hive基于UDF进行文本分词
  11. Hive窗口函数row number的用法
  12. 数据仓库之拉链表

hive作为一个sql查询引擎,自带了一些基本的函数,比如count(计数),sum(求和),有时候这些基本函数满足不了我们的需求,这时候就要写hive hdf(user defined funation),又叫用户自定义函数,应用与select 语句中。

哪些情况满足不了我们的需求呢,比如:

  • 需要将字段与数据库中查询一下,做个比对;
  • 需要对数据进行复杂处理;
  • 等等

hive udf 用法

下面是一个判断hive表字段是否包含'100'的简单udf:

package com.js.dataclean.hive.udf.hm2

import org.apache.hadoop.hive.ql.exec.UDF;

public class IsContains100 extends UDF{

    public String evaluate(String s){

        if(s == null || s.length() == 0){
            return "0";
        }

        return s.contains("100")?"1":"0";
    }
}

使用maven将其打包,进入hive cli,输入命令:

add jar /home/hadoop/codejar/flash_format.jar;
create temporary function isContains100 as 'com.js.dataclean.hive.udf.hm2.IsContains100';

创建完临时函数,即可使用这个函数了:

select isContains100('abc100def') from table limit 1;
1

hive udf 创建与使用步骤

  • 继承org.apache.hadoop.hive.ql.exec.UDF类,实现evaluate方法;
  • 打包上传到集群,通过create temporary function创建临时函数,不加temporary就创建了一个永久函数;
  • 通过select 语句使用;

下面是一个例子,通过读取mysql数据库中的规则,为hive中的workflow返回对应的,类型:

type workflow
a   1
a   2
b   11
b   22
b   33

我们希望,将hive的某一个字段取值为,1,2的变为a,取值为11,22,33的全部变为b,就是归类的意思。
这个udf可以这么实现:

package com.js.dataclean.hive.udf.hm2.workflow;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @ Author: keguang
 * @ Date: 2018/12/13 16:24
 * @ version: v1.0.0
 * @ description:
 */
public class GetWorkflow extends UDF{

    private static final String host = "0.0.0.0";
    private static final String port = "3306";
    private static final String database = "root";
    private static final String userName = "root";
    private static final String password = "123456";
    private static String url = "";
    private static final String driver = "com.mysql.jdbc.Driver";
    private static Connection conn = null;
    private static Map> workflowType = null;

    static {
        url = "jdbc:mysql://" + host + ":" + port + "/" + database;
        try {
            Class.forName(driver);
            conn = DriverManager.getConnection(url, userName, password);
            workflowType = getWorkflowType(conn);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private static Map> getWorkflowType(Connection conn){
        Map> workflowType = new HashMap<>();
        String sql = "select * from flash_player_workflow";
        PreparedStatement ps = null;
        try {
            ps = conn.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            while (rs.next()){
                String workflow = rs.getString("workflow");
                String type = rs.getString("flag");

                List workflows = workflowType.get(type);
                if(workflows == null){
                    workflows = new ArrayList<>();
                }
                workflows.add(workflow);
                workflowType.put(type, workflows);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {

            // 关闭链接
            if(conn != null){
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
        return workflowType;

    }

    public String evaluate(String s){
        assert workflowType != null;

        for(String type:workflowType.keySet()){
            List workflows = workflowType.get(type);
            if(workflows.contains(s)){
                return type;
            }
        }

        return s;
    }

}

查看hive function的用法:

查month 相关的函数

show functions like '*month*';

查看 add_months 函数的用法

desc function add_months;

查看 add_months 函数的详细说明并举例

desc function extended add_months;

hive 中的 UDAF

可以看出,udf就是一个输入一个输出,输入一个性别,返回'男'或者'女',如果我们想实现select date,count(1) from table,统计每天的流量呢?这就是一个分组统计,显然是多个输入,一个输出,这时候udf已经不能满足我们的需要,就需要写udaf,user defined aggregare function(用户自定义聚合函数)。

这里写一个字符串连接函数,相当于concat的功能,将多行输入,合并为一个字符串:

package com.js.dataclean.hive.udaf.hm2;

import com.js.dataclean.utils.StringUtil;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

/**
 * 实现字符串连接聚合的UDAF
 * @version v1.0.0
 * @Author:keguang
 * @Date:2018/10/22 14:36
 */
public class MutiStringConcat extends UDAF{
    public static class SumState{
        private String sumStr;
    }

    public static class SumEvaluator implements UDAFEvaluator{
        SumState sumState;

        public SumEvaluator(){
            super();
            sumState = new SumState();
            init();
        }

        @Override
        public void init() {
            sumState.sumStr = "";
        }

        /**
         * 来了一行数据
         * @param s
         * @return
         */
        public boolean iterate(String s){
            if(!StringUtil.isNull(s)){
                sumState.sumStr += s;
            }
            return true;
        }

        /**
         * 状态传递
         * @return
         */
        public SumState terminatePartial() {
            return sumState;
        }

        /**
         * 子任务合并
         * @param state
         * @return
         */
        public boolean merge(SumState state){
            if(state != null){
                sumState.sumStr += state.sumStr;
            }
            return true;
        }

        /**
         * 返回最终结果
         * @return
         */
        public String terminate(){
            return sumState.sumStr;
        }
    }
}

用法,与udf一样,还是需要打包并且到hive cli中注册使用。

关于UDAF开发注意点:

  • 需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,这两个包都是必须的
  • 函数类需要继承UDAF类,内部类Evaluator实现UDAFEvaluator接口
  • Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数

    • init函数类似于构造函数,用于UDAF的初始化
    • iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean
    • terminatePartial无参数,其为iterate函数轮转结束后,返回乱转数据,iterate和terminatePartial类似于hadoop的Combiner
    • merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
    • terminate返回最终的聚集函数结果

ikeguang

这个人很懒,什么都没留下

文章评论