Go语言kylin任务自动化实例详解

来自:网络
时间:2022-06-08
阅读:
目录

前言

Go语言kylin任务自动化实例详解

kylin是一个开源的OLAP分析引擎,具有亚秒级查询大表的能力

通过kylin提供的cube预构建功能,省去了不断写sql查询hive的麻烦,强化了任务统一管理和结果快速呈现的效果

kylin官网: https://kylin.apache.org/cn/

任务

当kylin集群比较大,和有多个kylin集群时,说明cube也越来越多,几百上千个cube便是常用便饭了

这些任务的运行就成了难题,人工去界面上点点点完全不实现了。此时就需要做成自动化周期性的任务

因为官方没有提供Go的客户端,只提供了http的api请求。下列例子使用Go中的http包来实现自动化任务

自动化实现

初始化

使用第三方http包(HttpRequest)来做http相关的请求,该包支持GET,POST,DELETE,PUT等四种请求方法,正好完全满足请求kylin的要求

var (
   url = "http://ip:7070/kylin/"
   username = "ADMIN"
   password = "Password"
   req *HttpRequest.Request
)
func init() {
   req = HttpRequest.NewRequest().Debug(false).SetTimeout(time.Second*5).
      SetHeaders(map[string]string{
         "Content-Type": "application/json;charset=utf-8",
      }).SetBasicAuth(username, password)
}

cube提交build

该方法接收三个参数,需要构建的cube名称,以及开始时间戳和结束时间戳

调用示例:

cubeBuild("dwd_jd_order","1637193600000","1637280000000")

时间戳获取方法,在第6小节

func cubeBuild(cube,startTime,endTime string) {
   m := map[string]string{
      "startTime": startTime,
      "endTime":   endTime,
      "buildType": "BUILD",
   }
   resp, err := req.JSON().Put(url+"api/cubes/"+cube+"/build", m)
   if err != nil {
      fmt.Println("cube构建请求错误: ", err)
   }
   if resp.StatusCode() != 200 {
      fmt.Println("cube构建状态码不符期望: ",resp.StatusCode())
   }
}

cube运行结果检查

检查cube运行结果,是成功还是失败了,还提供一个重新构建开关,如果cube失败,调用重构

kylin job检查接口属性说明

jobSearchMode 搜索模式(检查点和cubeing两种) ALL所有模式的数据

limit 限制返回条数

offset 位置(0是从第一条开始)

status 状态类型(8是错误类型,0是new,1是pending,2是running,32是stopped,4是finished,16是discarded)

timeFilter 时间范围过滤(1是一天,2是一周,3是一月,4是一年,5是全部)

调用示例: jobCheck(false)

为什么要在检查里面调重构方法,是因为重构cube需要拿到uuid,但uuid只能在这个接口中获取到,且uuid不是固定的,需要运cube运行后才可得到

func jobCheck(resumeSwitch bool) {
   resp, err := req.Get(url+"api/jobs?jobSearchMode=ALL&limit=15&offset=0&status=8&timeFilter=1")
   if err != nil {
      fmt.Println("job检查请求错误: ", err)
   }
   if resp.StatusCode() != 200 {
      fmt.Println("job检查状态码不符期望: ",resp.StatusCode())
   }
   body, _ := resp.Body()
   var i interface{}
   json.Unmarshal(body,&i)
   uuid, err := jmespath.Search("[0].uuid", i)
   if err != nil {
      fmt.Println("search err: ",err)
   }
   fmt.Println(uuid)
   if resumeSwitch {
      cubeResume("uuid")
   }
}

重构cube

重构cube在job失败后,自动构建非常有用,避免人工频繁介入到这些工作中,是自动化中关键一步

调用示例: cubeResume("uuid")

func cubeResume(uuid string)  {
   resp, err := req.Put(url+"api/jobs/"+uuid+"/resume")
   if err != nil {
      fmt.Println("cube重新build请求错误: ", err)
   }
   if resp.StatusCode() != 200 {
      fmt.Println("cube重新build状态码不符期望: ",resp.StatusCode())
   }
}

历史job清理

kylin在运行一段时间后,就会产生很多冗余,且时需要周期性的清理这些历史job

调用示例: jobHistoryDelete("uuid")

需要先检查job,获取uuid,然后再删除历史job

func jobHistoryDelete(uuid string) {
   resp, err := req.Delete(url+"api/jobs/"+uuid+"/drop")
   if err != nil {
      fmt.Println("历史job清理请求错误: ", err)
   }
   if resp.StatusCode() != 200 {
      fmt.Println("历史job清理状态码不符期望: ",resp.StatusCode())
   }
}

时间戳

kylin要求的时间毫秒,这里使用纳秒时间戳方法除一下就得到了毫秒

func timestamp()  {
   year := time.Now().Year()
   month := time.Now().Month()
   day := time.Now().Day()
   //今天的时间戳
   today := time.Date(year, month, day, 8, 0, 0, 0, time.Local).UnixNano() / 1e6
   fmt.Println(today)
   //昨天的时间戳
   iDay := time.Now().AddDate(0, 0, -1).Day()
   yesterday := time.Date(year, month, iDay, 8, 0, 0, 0, time.Local).UnixNano() / 1e6
   fmt.Println(yesterday)
}

小结

以上方法配合定时任务,就可以实现kylin自动化运维工作了,当然kylin官网还提供了更多接口,有需求的同学可以看看

传送门: https://kylin.apache.org/cn/docs31/howto/howto_use_restapi.html

更多关于Go语言kylin任务自动化的资料请关注其它相关文章!

返回顶部
顶部