ETL转换指导:如何将金蝶数据整合至钉钉系统

  • 轻易云集成顾问-钟家寿

金蝶云星空数据集成钉钉:传给金蝶后,回传钉钉提示(已审核)④

当使用轻易云数据集成平台对接企业的不同系统时,我们需要确保每个环节都精准、高效地执行。本文分享一个经典的系统对接案例——将金蝶云星空中的业务数据集成到钉钉,实现信息的双向流动和实时通知。

背景描述

在本案例中,主要目标是通过轻易云的数据处理能力,将完成状态的业务单据从金蝶云星空拉取到,并将其自动传输到钉钉上进行审计确认。实现这个目标,只需调用两组API接口:

  • 获取金蝶云星空数据executeBillQuery
  • 写入到钉钉topapi/process/instance/comment/add

具体方案命名为“传给金蝶后,回传钉钉提示(已审核)④”,我们利用了几个关键技术特性来保证流程顺畅,包括高吞吐量的数据写入、定制化的数据转换逻辑以及集中监控和告警功能。

技术要点

  1. 高效的数据获取与写入

    • 我们首先配置了轻易云平台,通过executeBillQuery接口批量抓取金蝶云星空内特定条件下已完成审批的单据。这些单据信息会被快速、高效地导出,有赖于平台所支持的大规模并发请求能力。
  2. 自定义数据转换逻辑

    • 数据从源系统(金蝶云星空)提取后,需要根据业务需求做适当格式转化,以匹配目标系统(如扣除冗余字段或添加必要标识)。这一过程通过可视化编辑器设定,使开发人员能够快速制定合乎需求的转换规则,提高效率和准确度。
  3. 分页与限流控制策略

    • 在处理大体量数据时,为防止一次性请求导致性能瓶颈或接口受限,我们设置了分页机制及速率限制策略,有效避免API超载问题,同时确保所有需要处理的信息均能逐步、安全地得到操作。
  4. 可靠异常检测与重试机制

    • 集成过程难免遇见失败情境,如网络故障或者临时服务不可用等情况。采用集中监控和告警手段,可以实时跟踪任务状态,一旦发现错误即刻触发重试机制,从而保障最终一致性。
  5. 反馈结果至终端用户

    • 数据成功整合至企业管理后台后,还需即时通知相关人员,即把成功状态反馈回来。例如,通过调用` 金蝶与WMS系统接口开发配置

      调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成的生命周期中,第一步是从源系统获取数据。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空接口executeBillQuery来获取并加工数据。

接口配置与请求参数

首先,我们需要了解接口的基本配置和请求参数。根据提供的元数据配置,接口为executeBillQuery,请求方法为POST。以下是主要的请求参数:

  • FormId: 业务对象表单Id,必须填写金蝶的表单ID,例如:AP_PAYBILL
  • FieldKeys: 需查询的字段key集合
  • FilterString: 过滤条件,例如:FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' and FDOCUMENTSTATUS='C'
  • Limit: 最大行数,用于分页
  • StartRow: 开始行索引,用于分页

请求字段解析

元数据配置中详细列出了需要查询的字段,包括但不限于:

  • FPAYBILLENTRY_FEntryID: 金蝶分录主键ID
  • FID: 实体主键
  • FBillNo: 单据编号
  • FDOCUMENTSTATUS: 单据状态
  • FCreatorId: 创建人
  • FAPPROVERID: 审核人
  • FCreateDate: 创建日期
  • FSETTLEORGID.FNumber: 结算组织编号

这些字段将用于构建查询请求,并且可以根据实际需求进行调整。

构建查询请求

根据元数据配置,我们可以构建一个完整的查询请求。以下是一个示例请求体:

{
  "FormId": "AP_PAYBILL",
  "FieldKeys": "FPAYBILLENTRY_FEntryID,FID,FBillNo,FDOCUMENTSTATUS,FCreatorId,FAPPROVERID,FCreateDate,FSETTLEORGID.FNumber",
  "FilterString": "FApproveDate>='2023-01-01' and FDOCUMENTSTATUS='C'",
  "Limit": 500,
  "StartRow": 0
}

在这个示例中,我们设置了分页参数Limit为500,表示每次查询最多返回500条记录;StartRow为0,表示从第0行开始查询。

数据处理与清洗

一旦成功获取到数据,下一步就是对数据进行处理和清洗。这一步骤包括但不限于:

  1. 数据格式转换:将金蝶返回的数据格式转换为目标系统所需的格式。
  2. 字段映射:根据业务需求,对字段进行重新命名或映射。
  3. 数据过滤:进一步过滤不符合业务逻辑的数据。

例如,如果我们需要将金蝶返回的数据转换为JSON格式,并且只保留已审核的数据,可以使用以下代码片段:

import json

def process_data(response):
    data = json.loads(response)
    processed_data = []
    for record in data:
        if record['FDOCUMENTSTATUS'] == 'C':
            processed_record = {
                'entry_id': record['FPAYBILLENTRY_FEntryID'],
                'bill_no': record['FBillNo'],
                'status': record['FDOCUMENTSTATUS'],
                'creator': record['FCreatorId'],
                'approver': record['FAPPROVERID'],
                'create_date': record['FCreateDate'],
                'settle_org': record['FSETTLEORGID.FNumber']
            }
            processed_data.append(processed_record)
    return json.dumps(processed_data, ensure_ascii=False)

异常处理与重试机制

在实际操作中,可能会遇到各种异常情况,如网络超时、接口返回错误等。因此,需要设计合理的异常处理与重试机制。例如,可以在捕获异常后进行多次重试,并记录错误日志以便后续排查。

import requests
import time

def fetch_data(url, payload, retries=3):
    for attempt in range(retries):
        try:
            response = requests.post(url, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            time.sleep(2 ** attempt) # 指数退避策略
    raise Exception("Failed to fetch data after multiple attempts")

url = "https://api.kingdee.com/executeBillQuery"
payload = {
    "FormId": "AP_PAYBILL",
    "FieldKeys": "FPAYBILLENTRY_FEntryID,FID,FBillNo,FDOCUMENTSTATUS,FCreatorId,FAPPROVERID,FCreateDate,FSETTLEORGID.FNumber",
    "FilterString": "FApproveDate>='2023-01-01' and FDOCUMENTSTATUS='C'",
    "Limit": 500,
    "StartRow": 0
}

data = fetch_data(url, payload)
processed_data = process_data(data)
print(processed_data)

通过上述步骤,我们可以高效地调用金蝶云星空接口获取并加工数据,为后续的数据集成和分析打下坚实基础。 打通钉钉数据接口

轻易云数据集成平台生命周期第二步:ETL转换与写入钉钉API接口

在数据集成过程中,ETL(Extract, Transform, Load)是至关重要的一环。本文将重点探讨如何使用轻易云数据集成平台,将已集成的源平台数据进行ETL转换,并转为目标平台钉钉API接口所能接收的格式,最终写入目标平台。

数据请求与清洗

在数据集成的初始阶段,我们已经从源系统(金蝶)获取了原始数据,并进行了必要的清洗和预处理。这一步骤确保了数据的准确性和一致性,为后续的转换和写入奠定了基础。

数据转换与写入

接下来,我们进入生命周期的第二步,即将清洗后的数据进行转换,并通过钉钉API接口写入目标平台。以下是具体操作步骤及技术细节:

元数据配置解析

根据提供的元数据配置,我们需要向钉钉API接口发送一个POST请求,具体配置如下:

{
  "api": "topapi/process/instance/comment/add",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "request",
      "label": "请求对象",
      "type": "object",
      "children": [
        {
          "field": "process_instance_id",
          "label": "审批实例ID",
          "type": "string",
          "describe": "可通过调用获取审批实例ID列表接口获取。",
          "value": "_findCollection find id from a9138482-05d7-36b8-a20e-2f3bdb3f90fb where business_id={FBillNo}",
          "parent": "request"
        },
        {
          "field": "text",
          "label": "评论的内容",
          "type": "string",
          "value": "已审核",
          "parent": "request"
        },
        {
          "field": "comment_userid",
          "label": "评论人的userid",
          "type": "string",
          "value": "112018120420563028",
          "parent": "request"
        }
      ]
    }
  ]
}
配置解析与应用
  1. API Endpointtopapi/process/instance/comment/add

    • 这是我们需要调用的钉钉API接口,用于添加审批实例评论。
  2. HTTP MethodPOST

    • 我们将使用POST方法发送请求。
  3. ID Checktrue

    • 表示需要对ID进行校验,确保唯一性和有效性。
  4. Request Payload

    • process_instance_id: 从金蝶系统中获取审批实例ID,通过查询语句 _findCollection find id from a9138482-05d7-36b8-a20e-2f3bdb3f90fb where business_id={FBillNo} 获取。
    • text: 评论内容,固定为“已审核”。
    • comment_userid: 评论人的用户ID,固定为“112018120420563028”。
实现步骤
  1. 提取审批实例ID

    • 首先,从金蝶系统中提取业务单据号(FBillNo),并通过查询语句获取对应的审批实例ID。
  2. 构建请求对象

    • 根据元数据配置构建请求对象,包括process_instance_idtextcomment_userid字段。
  3. 发送HTTP POST请求

    • 使用构建好的请求对象,通过HTTP POST方法调用钉钉API接口。

以下是示例代码片段,展示如何实现上述步骤:

import requests
import json

# 提取审批实例ID
def get_process_instance_id(fbill_no):
    query = f"_findCollection find id from a9138482-05d7-36b8-a20e-2f3bdb3f90fb where business_id={fbill_no}"
    # 假设有一个函数execute_query来执行查询并返回结果
    process_instance_id = execute_query(query)
    return process_instance_id

# 构建请求对象
def build_request_object(process_instance_id):
    request_object = {
        'request': {
            'process_instance_id': process_instance_id,
            'text': '已审核',
            'comment_userid': '112018120420563028'
        }
    }
    return request_object

# 发送HTTP POST请求
def send_post_request(api_endpoint, request_object):
    url = f"https://oapi.dingtalk.com/{api_endpoint}"
    headers = {'Content-Type': 'application/json'}
    response = requests.post(url, headers=headers, data=json.dumps(request_object))
    return response.json()

# 主流程
def main(fbill_no):
    process_instance_id = get_process_instance_id(fbill_no)
    request_object = build_request_object(process_instance_id)
    response = send_post_request("topapi/process/instance/comment/add", request_object)
    print(response)

# 示例调用
main("example_fbill_no")

通过以上步骤,我们成功地将从金蝶系统获取的数据进行了ETL转换,并通过钉钉API接口写入了目标平台。这不仅实现了不同系统间的数据无缝对接,也提升了业务流程的自动化程度。 数据集成平台API接口配置

更多系统对接方案