等待下一个秋

  • Spark
  • Flink
  • Hive
  • 数据仓库
  • ClickHouse
  • 收徒弟
  • Java
    • Spring
    • Mybatis
    • SpringBoot
    • 面试题
  • Python
    • Python基础
    • 爬虫
    • Numpy
    • matplotlib
    • Flask
  • 技术杂谈
    • Linux知识
    • Docker
    • Git教程
    • Redis教程
    • mysql
    • 前端
    • R语言
    • 机器学习
  • 关于我
  • 其它
    • 副业挣钱
    • 资料下载
    • 资料文档
专注于Hadoop/Spark/Flink/Hive/数据仓库等
关注公众号:大数据技术派,获取更多学习资料。
  1. 首页
  2. Hive
  3. 正文

SpringBoot连接Hive实现自助取数

2020年11月5日 9967点热度 1人点赞 0条评论

Hive系列文章

  1. Hive表的基本操作
  2. Hive中的集合数据类型
  3. Hive动态分区详解
  4. hive中orc格式表的数据导入
  5. Java通过jdbc连接hive
  6. 通过HiveServer2访问Hive
  7. SpringBoot连接Hive实现自助取数
  8. hive关联hbase表
  9. Hive udf 使用方法
  10. Hive基于UDF进行文本分词
  11. Hive窗口函数row number的用法
  12. 数据仓库之拉链表

公司运营免不了让我们数据做一些临时取数,这些取数有时候是重复的,或者可以做成可配置的。需要开发成界面,供他们选择,自然想到SpringBoot连接Hive,可以把取数做成一键生成,或者让他们自己写sql,通常大多人是不会sql的。

1. 需要的依赖配置

为了节省篇幅,这里给出hiveserver2方式连接hive主要的maven依赖,父工程springboot依赖省略。

<!-- 版本信息 -->
<properties>
    <hadoop.version>2.6.5</hadoop.version>
    <mybatis.version>3.2.7</mybatis.version>
    <scopeType>compile</scopeType>
</properties>
<dependency>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis</artifactId>
    <version>${mybatis.version}</version>
</dependency>

<!-- hadoop依赖 -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop.version}</version>
    <scope>${scopeType}</scope>
</dependency>

<!-- hive-jdbc -->
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
        <exclusion>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
        </exclusion>
        <exclusion>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </exclusion>
    </exclusions>
    <version>1.2.1</version>
    <scope>${scopeType}</scope>
</dependency>

<!-- 解析html -->
<dependency>
    <groupId>org.jsoup</groupId>
    <artifactId>jsoup</artifactId>
    <version>1.8.3</version>
</dependency>

application-test.yml配置数据库连接,这里用的是druid连接池管理hiveserver2连接,也是没有问题的。

# Spring配置
spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    druid:
      # 多数据源**省略若干***
      # hive数据源
      slave3:
        # 从数据源开关/默认关闭
        enabled: true
        driverClassName: org.apache.hive.jdbc.HiveDriver
        url: jdbc:hive2://cdh:10000/default
        username: bigdata
        password: bigdata

2. 代码实现

代码实现跟其它程序一样,都是mapper、service、controller层,套路一模一样。一共设置了实时和离线两个yarn资源队列,由于其它部门人使用可能存在队列压力过大的情况,需要对数据量按照每次查询的数据范围不超过60天来限制,和此时集群使用资源不能大于55%,这里重点说明一下controller层对数据量的预防。

实体类UserModel:

@NoArgsConstructor
@AllArgsConstructor
@Data
@ToString
public class UserModel extends BaseEntity{

    private String userId;
    private Integer count;
}

2.1 集群资源使用率不大于55%

因为很多业务查询逻辑controller都要用到数据量防御过大的问题,这里使用了被Spring切面关联的注解来标识controller。

定义切面YarnResourceAspect,并且关联注解@YarnResource

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface YarnResource {

}

@Aspect
@Component
public class YarnResourceAspect {

    private static final Logger log = LoggerFactory.getLogger(YarnResourceAspect.class);

    /**
     * 配置切入点
     */
    @Pointcut("@annotation(com.ruoyi.common.annotation.YarnResource)")
    public void yarnResourcdPointCut(){
    }

    /**
     * 检查yarn的资源是否可用
     */
    @Before("yarnResourcdPointCut()")
    public void before(){
        log.info("************************************检查yarn的资源是否可用*******************************");
        // yarn资源紧张
        if(!YarnClient.yarnResourceOk()){
            throw new InvalidStatusException();
        }
    }

}

获取yarn的资源使用数据:

@Slf4j
public class YarnClient {

    /**
     * yarn资源不能超过多少
     */
    private static final int YARN_RESOURCE = 55;

    /**
     *
     * @return true : 表示资源正常, false: 资源紧张
     */
    public static boolean yarnResourceOk() {
        try {
            URL url = new URL("http://master:8088/cluster/scheduler");
            HttpURLConnection conn = null;
            conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("GET");
            conn.setUseCaches(false);
            // 请求超时5秒
            conn.setConnectTimeout(5000);
            // 设置HTTP头:
            conn.setRequestProperty("Accept", "*/*");
            conn.setRequestProperty("User-Agent", "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.111 Safari/537.36");
            // 连接并发送HTTP请求:
            conn.connect();

            // 判断HTTP响应是否200:
            if (conn.getResponseCode() != 200) {
                throw new RuntimeException("bad response");
            }
            // 获取所有响应Header:
            Map<String, List<String>> map = conn.getHeaderFields();
            for (String key : map.keySet()) {
                System.out.println(key + ": " + map.get(key));
            }
            // 获取响应内容:
            InputStream input = conn.getInputStream();
            byte[] datas = null;

            try {
                // 从输入流中读取数据
                datas = readInputStream(input);
            } catch (Exception e) {
                e.printStackTrace();
            }
            String result = new String(datas, "UTF-8");// 将二进制流转为String

            Document document = Jsoup.parse(result);

            Elements elements = document.getElementsByClass("qstats");

            String[] ratios = elements.text().split("used");

            return Double.valueOf(ratios[3].replace("%", "")) < YARN_RESOURCE;
        } catch (IOException e) {
            log.error("yarn资源获取失败");
        }

        return false;

    }

    private static byte[] readInputStream(InputStream inStream) throws Exception {
        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
        byte[] buffer = new byte[1024];
        int len = 0;
        while ((len = inStream.read(buffer)) != -1) {
            outStream.write(buffer, 0, len);
        }
        byte[] data = outStream.toByteArray();
        outStream.close();
        inStream.close();
        return data;
    }
}

在controller上通过注解@YarnResource标识:

@Controller
@RequestMapping("/hero/hive")
public class HiveController {

    /**
     * html 文件地址前缀
     */
    private String prefix = "hero";

    @Autowired
    IUserService iUserService;

    @RequestMapping("")
    @RequiresPermissions("hero:hive:view")
    public String heroHive(){
        return prefix + "/hive";
    }

    @YarnResource
    @RequestMapping("/user")
    @RequiresPermissions("hero:hive:user")
    @ResponseBody
    public TableDataInfo user(UserModel userModel){
        DateCheckUtils.checkInputDate(userModel);

        PageInfo pageInfo = iUserService.queryUser(userModel);
        TableDataInfo tableDataInfo = new TableDataInfo();

        tableDataInfo.setTotal(pageInfo.getTotal());
        tableDataInfo.setRows(pageInfo.getList());

        return tableDataInfo;
    }
}

2.2 查询数据跨度不超过60天检查

这样每次请求进入controller的时候就会自动检查查询的日期是否超过60天了,防止载入数据过多,引发其它任务资源不够。

public class DateCheckUtils {

    /**
     * 对前台传入过来的日期进行判断,防止查询大量数据,造成集群负载过大
     * @param o
     */
    public static void checkInputDate(BaseEntity o){
        if("".equals(o.getParams().get("beginTime")) && "".equals(o.getParams().get("endTime"))){
            throw new InvalidTaskException();
        }

        String beginTime = "2019-01-01";
        String endTime = DateUtils.getDate();

        if(!"".equals(o.getParams().get("beginTime"))){
            beginTime = String.valueOf(o.getParams().get("beginTime"));
        }

        if(!"".equals(o.getParams().get("endTime"))){
            endTime = String.valueOf(o.getParams().get("endTime"));
        }

        // 查询数据时间跨度大于两个月
        if(DateUtils.getDayBetween(beginTime, endTime) > 60){
            throw new InvalidTaskException();
        }
    }
}

这里访问hive肯定需要切换数据源的,因为其它页面还有对mysql的数据访问,需要注意一下。

目前功能看起来很简单,没有用到什么高大上的东西,后面慢慢完善。

标签: Hive Java SpringBoot 大数据
最后更新:2022年1月22日

等待下一个秋

待我代码写成,便娶你为妻!专注于Hadoop/Spark/Flink/Hive/数据仓库等,关注公众号:大数据技术派,获取更多学习资料。

打赏 点赞
< 上一篇
下一篇 >

文章评论

取消回复

等待下一个秋

待我代码写成,便娶你为妻!专注于Hadoop/Spark/Flink/Hive/数据仓库等,关注公众号:大数据技术派,获取更多学习资料。

搜一搜
微信
最新 热点 随机
最新 热点 随机
ChatGPT可以做什么 ClickHouse 自定义分区键 ClickHouse数据副本引擎 ClickHouse ReplacingMergeTree引擎 ClickHouse MergeTree引擎 clickhouse简介
Python作图模块之turtle R语言常用函数 第10讲:Flink Side OutPut 分流 数仓建模—ID Mapping wordpress安装常见问题 Matplotlib pyplot 教程
标签聚合
挣钱 Hive mysql 算法 Redis R语言 数据仓库 大数据 Java 书籍 Python Flink
文章归档
  • 2023年2月
  • 2022年12月
  • 2022年11月
  • 2022年9月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年10月
  • 2021年9月
  • 2021年8月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年5月
  • 2020年4月
  • 2020年1月
  • 2019年9月
  • 2019年8月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年1月
  • 2018年12月
  • 2017年5月

©2022 ikeguang.com. 保留所有权利。

鄂ICP备2020019097号-1

鄂公网安备 42032202000160号