python注册钉钉回调事件的实现

来自:互联网
时间:2021-08-11
阅读:
目录
1、注册端
2、回调端:以下示例代码为python2,django 框架

钉钉API文档:https://ding-doc.dingtalk.com/doc#/serverapi2/skn8ld

钉钉有回调事件流程,有哪些回调?比如:通讯录回调、审批回调等等,拿通讯录回调来说,就是当你公司组织架构发生变动时,会自动触发你自己注册的回调地址,然后根据回调信息做一些自定义的处理,不得不说,钉钉真的是解决了协同办公的很多问题,非常nice,但就回调事件来说,每个企业只能注册一个回调地址,即使你要监听的是不同的事件、即使还有其他业务线需要用到回调,也只能不多于一个回调,当然这都是出于人家服务多方面考虑的设计,废话不多说,

好,流程:

  一个注册端向钉钉发送注册请求,钉钉callback你自己的服务url,必须是公网访问的地址,并且该地址返回钉钉json数据来告诉钉钉回调成功

  注册成功之后这个地址就可以接收钉钉的回调post请求,做一些自定义处理,记得返回json

好,代码

1、注册端

mport requests, json


class DingDingCallBack(object):
    def __init__(self):
        self.appsecret=''
        self.appkey=''
        self.api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(self.appkey,self.appsecret)
        self.aes_key = ""
        self.callbackUrl = "回调url"
        self.headers = {
            'Content-Type': 'application/json',
            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.96 Safari/537.36',
            'username':'zhangsan',
            'password':'123456'
        }

    def get_token(self):
        res = requests.get(self.api_url)
        if res.status_code == 200:
            str_res = res.text
            token = (json.loads(str_res)).get('access_token')
            return token

    def regist_call_back(self):
        url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
        data = {
            "call_back_tag": ["bpms_task_change", "bpms_instance_change"], # 审批回调
            "token": "qxN3cm",
            "aes_key": self.aes_key,
            "url":self.callbackUrl,
            }
        data1 = json.dumps(data)
        response = requests.post(url, headers=self.headers, data=data1)
        print(response)
        print(response.text)

    def query_callback(self):
        url = 'https://oapi.dingtalk.com/call_back/get_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
        response = requests.get(url, headers=self.headers, )
        print(response)
        print(response.text)


    def update_call_back(self):
        url = 'https://oapi.dingtalk.com/call_back/update_call_back?access_token={}'.format(self.get_token())
        data = {
            "call_back_tag": ["bpms_task_change", "bpms_instance_change","user_add_org","user_leave_org","org_dept_create","org_dept_modify","org_dept_remove"],
            "token": "自定义字符串",
            "aes_key": self.aes_key,
            "url":self.callbackUrl
            }
        data1 = json.dumps(data)
        response = requests.post(url, headers=self.headers, data=data1)
        print(response)
        print(response.text)

    def get_fail(self):
        url = 'https://oapi.dingtalk.com/call_back/get_call_back_failed_result?access_token={}'.format(self.get_token())
        response = requests.get(url, headers=self.headers, )
        print(response.text)# todo 删除回调
    def delete_callback(self):
        url = 'https://oapi.dingtalk.com/call_back/delete_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
        result = requests.get(url)
        print(result)
        print(result.text)

#
if __name__ == '__main__':
    dingOBJ = DingDingCallBack()
    # todo 获取钉钉token
#     # dingOBJ.get_token()
    # todo 获取回调失败
#     dingOBJ.get_fail()
    # todo 注册回调
    # dingOBJ.regist_call_back()
    # todo 查询回调事件
    dingOBJ.query_callback()
    # todo 修改回调事件
#     dingOBJ.update_call_back()
    # todo 删除回调
    # dingOBJ.delete_callback()

2、回调端:以下示例代码为python2,django 框架

# -*- coding:utf-8 -*-

from django.shortcuts import render
from django.http import JsonResponse,HttpResponse
from django.views.generic import View
from django.views.decorators.csrf import csrf_exempt
from DingCrypto import DingCrypto
import random,string,time,json,requests,simplejson
# Create your views here.from ast import literal_eval


encodingAesKey = ''
key = ''
token = '自定义字符串'
appsecret = ''
appkey = ''

api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(appkey,appsecret)


def randam_string(n):
    ran_str = ''.join(random.sample(string.ascii_letters + string.digits, n))
    return ran_str

# 注册回调
@csrf_exempt
def workOrderCallback(request):
    if request.method == 'GET':
        return JsonResponse({'code':200,'msg':'ok'})


    if request.method == 'POST':
        print request.GET
        dingCrypto = DingCrypto(encodingAesKey,key)
        nonce = randam_string(8)
        timestamp = str(int(round(time.time())))
        encrpyt = dingCrypto.encrypt('success')
        # print nonce,timestamp,token,encrpyt
        signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt)
        new_data = {
            'data': {
                'msg_signature': signature,
                'timeStamp': timestamp,
                'nonce': nonce,
                'encrypt': encrpyt
            },
        }


        encrpyt11 = dingCrypto.decrypt(encrpyt)

        return JsonResponse(new_data.get('data'))

# 响应回调
class CallBack(View):
    def get(self,request,*args,**kwargs):
        return JsonResponse({'code':200,'msg':'ok'})

    def get_token(self):
        res = requests.get(api_url)
        if res.status_code == 200:
            str_res = res.text
            token = (json.loads(str_res)).get('access_token')
            return token


    def post(self,request,*args,**kwargs):

        # data = request.GET

        dingCrypto = DingCrypto(encodingAesKey, key)
        nonce = randam_string(8)
        timestamp = str(int(round(time.time())))
        encrpyt = dingCrypto.encrypt('success')
        # signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt)
        callback = json.loads(dingCrypto.decrypt(json.loads(request.body).get('encrypt')))
        if callback['EventType'] == 'bpms_instance_change': # 审批实例开始,结束
            url = 'https://oapi.dingtalk.com/topapi/processinstance/get?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
            instace_ = {
                "process_instance_id": callback['processInstanceId']
            }
            data2 = json.dumps(instace_)
            req = requests.post(url,data=data2)
            data = literal_eval(str(req.text)).get('process_instance')
            excute_workorder(callback['processInstanceId'],data)
        elif callback['EventType'] == 'bpms_task_change':  # 审批任务开始,结束,转交
            print 'bpms_task_change'
        elif callback['EventType'] == 'user_add_org':
            print '用户增加'elif callback['EventType'] == 'user_leave_org':
            print '用户离职'
        elif callback['EventType'] == 'org_dept_create':
            print '组织架构添加'elif callback['EventType'] == 'org_dept_modify':
            print '组织架构变更'elif callback['EventType'] == 'org_dept_remove':
            print '组织架构删除'return HttpResponse(encrpyt)
返回顶部
顶部