SpringBoot连接Hive实现自助取数

2020年11月5日 7835点热度 0人点赞 0条评论

Hive系列文章

  1. Hive表的基本操作
  2. Hive中的集合数据类型
  3. Hive动态分区详解
  4. hive中orc格式表的数据导入
  5. Java通过jdbc连接hive
  6. 通过HiveServer2访问Hive
  7. SpringBoot连接Hive实现自助取数
  8. hive关联hbase表
  9. Hive udf 使用方法
  10. Hive基于UDF进行文本分词
  11. Hive窗口函数row number的用法
  12. 数据仓库之拉链表

公司运营免不了让我们数据做一些临时取数,这些取数有时候是重复的,或者可以做成可配置的。需要开发成界面,供他们选择,自然想到SpringBoot连接Hive,可以把取数做成一键生成,或者让他们自己写sql,通常大多人是不会sql的。

1. 需要的依赖配置

为了节省篇幅,这里给出hiveserver2方式连接hive主要的maven依赖,父工程springboot依赖省略。



    2.6.5
    3.2.7
    compile


    org.mybatis
    mybatis
    ${mybatis.version}




    org.apache.hadoop
    hadoop-common
    ${hadoop.version}
    ${scopeType}





    org.apache.hive
    hive-jdbc
    
        
            org.slf4j
            slf4j-api
        
        
            ch.qos.logback
            logback-core
        
        
            ch.qos.logback
            logback-classic
        
    
    1.2.1
    ${scopeType}




    org.jsoup
    jsoup
    1.8.3

application-test.yml配置数据库连接,这里用的是druid连接池管理hiveserver2连接,也是没有问题的。

# Spring配置
spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    druid:
      # 多数据源**省略若干***
      # hive数据源
      slave3:
        # 从数据源开关/默认关闭
        enabled: true
        driverClassName: org.apache.hive.jdbc.HiveDriver
        url: jdbc:hive2://cdh:10000/default
        username: bigdata
        password: bigdata

2. 代码实现

代码实现跟其它程序一样,都是mapperservicecontroller层,套路一模一样。一共设置了实时和离线两个yarn资源队列,由于其它部门人使用可能存在队列压力过大的情况,需要对数据量按照每次查询的数据范围不超过60天来限制,和此时集群使用资源不能大于55%,这里重点说明一下controller层对数据量的预防。

实体类UserModel

@NoArgsConstructor
@AllArgsConstructor
@Data
@ToString
public class UserModel extends BaseEntity{

    private String userId;
    private Integer count;
}

2.1 集群资源使用率不大于55%

因为很多业务查询逻辑controller都要用到数据量防御过大的问题,这里使用了被Spring切面关联的注解来标识controller

定义切面YarnResourceAspect,并且关联注解@YarnResource

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface YarnResource {

}

@Aspect
@Component
public class YarnResourceAspect {

    private static final Logger log = LoggerFactory.getLogger(YarnResourceAspect.class);

    /**
     * 配置切入点
     */
    @Pointcut("@annotation(com.ruoyi.common.annotation.YarnResource)")
    public void yarnResourcdPointCut(){
    }

    /**
     * 检查yarn的资源是否可用
     */
    @Before("yarnResourcdPointCut()")
    public void before(){
        log.info("************************************检查yarn的资源是否可用*******************************");
        // yarn资源紧张
        if(!YarnClient.yarnResourceOk()){
            throw new InvalidStatusException();
        }
    }

}

获取yarn的资源使用数据:

@Slf4j
public class YarnClient {

    /**
     * yarn资源不能超过多少
     */
    private static final int YARN_RESOURCE = 55;

    /**
     *
     * @return true : 表示资源正常, false: 资源紧张
     */
    public static boolean yarnResourceOk() {
        try {
            URL url = new URL("http://master:8088/cluster/scheduler");
            HttpURLConnection conn = null;
            conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("GET");
            conn.setUseCaches(false);
            // 请求超时5秒
            conn.setConnectTimeout(5000);
            // 设置HTTP头:
            conn.setRequestProperty("Accept", "*/*");
            conn.setRequestProperty("User-Agent", "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.111 Safari/537.36");
            // 连接并发送HTTP请求:
            conn.connect();

            // 判断HTTP响应是否200:
            if (conn.getResponseCode() != 200) {
                throw new RuntimeException("bad response");
            }
            // 获取所有响应Header:
            Map> map = conn.getHeaderFields();
            for (String key : map.keySet()) {
                System.out.println(key + ": " + map.get(key));
            }
            // 获取响应内容:
            InputStream input = conn.getInputStream();
            byte[] datas = null;

            try {
                // 从输入流中读取数据
                datas = readInputStream(input);
            } catch (Exception e) {
                e.printStackTrace();
            }
            String result = new String(datas, "UTF-8");// 将二进制流转为String

            Document document = Jsoup.parse(result);

            Elements elements = document.getElementsByClass("qstats");

            String[] ratios = elements.text().split("used");

            return Double.valueOf(ratios[3].replace("%", "")) < YARN_RESOURCE;
        } catch (IOException e) {
            log.error("yarn资源获取失败");
        }

        return false;

    }

    private static byte[] readInputStream(InputStream inStream) throws Exception {
        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
        byte[] buffer = new byte[1024];
        int len = 0;
        while ((len = inStream.read(buffer)) != -1) {
            outStream.write(buffer, 0, len);
        }
        byte[] data = outStream.toByteArray();
        outStream.close();
        inStream.close();
        return data;
    }
}

controller上通过注解@YarnResource标识:

@Controller
@RequestMapping("/hero/hive")
public class HiveController {

    /**
     * html 文件地址前缀
     */
    private String prefix = "hero";

    @Autowired
    IUserService iUserService;

    @RequestMapping("")
    @RequiresPermissions("hero:hive:view")
    public String heroHive(){
        return prefix + "/hive";
    }

    @YarnResource
    @RequestMapping("/user")
    @RequiresPermissions("hero:hive:user")
    @ResponseBody
    public TableDataInfo user(UserModel userModel){
        DateCheckUtils.checkInputDate(userModel);

        PageInfo pageInfo = iUserService.queryUser(userModel);
        TableDataInfo tableDataInfo = new TableDataInfo();

        tableDataInfo.setTotal(pageInfo.getTotal());
        tableDataInfo.setRows(pageInfo.getList());

        return tableDataInfo;
    }
}

2.2 查询数据跨度不超过60天检查

这样每次请求进入controller的时候就会自动检查查询的日期是否超过60天了,防止载入数据过多,引发其它任务资源不够。

public class DateCheckUtils {

    /**
     * 对前台传入过来的日期进行判断,防止查询大量数据,造成集群负载过大
     * @param o
     */
    public static void checkInputDate(BaseEntity o){
        if("".equals(o.getParams().get("beginTime")) && "".equals(o.getParams().get("endTime"))){
            throw new InvalidTaskException();
        }

        String beginTime = "2019-01-01";
        String endTime = DateUtils.getDate();

        if(!"".equals(o.getParams().get("beginTime"))){
            beginTime = String.valueOf(o.getParams().get("beginTime"));
        }

        if(!"".equals(o.getParams().get("endTime"))){
            endTime = String.valueOf(o.getParams().get("endTime"));
        }

        // 查询数据时间跨度大于两个月
        if(DateUtils.getDayBetween(beginTime, endTime) > 60){
            throw new InvalidTaskException();
        }
    }
}

这里访问hive肯定需要切换数据源的,因为其它页面还有对mysql的数据访问,需要注意一下。

目前功能看起来很简单,没有用到什么高大上的东西,后面慢慢完善。

ikeguang

这个人很懒,什么都没留下

文章评论