吉客云数据集成到金蝶云星空案例分享:JY-BDS-盘亏单
在本文中,我们将聚焦于一个复杂而高效的系统对接集成案例——如何使用轻易云数据集成平台将吉客云的数据无缝集成到金蝶云星空。具体方案名称为 JY-BDS-盘亏单,通过这个方案我们实现了从吉客云接口(wms.stocktake.get)抓取数据,并通过金蝶云星空的 API 接口(batchSave)进行批量写入。
高吞吐量的数据写入能力
轻易云数据集成平台带来的高吞吐量优势使得大量盘亏单数据能够迅速被导入到吉客云,同时也能快速传输至金蝶云星空。这种处理能力极大地提升了业务的时效性,确保在短时间内完成大量订单的数据迁移和处理。
实时监控与告警系统
整个任务流程中,集中化的监控和告警系统实时跟踪和记录每个环节的数据流转状态,使得我们可以第一时间发现并解决任何异常。这一特性,对维护系统稳定运行以及优化性能有着不可忽视的重要作用。
数据质量监控与异常检测
为了确保所有从吉客云获取的数据没有丢失或误差,平台提供了强大的数据质量监控与异常检测功能,这些工具帮助我们及时查找、定位并纠正潜在的问题,从源头上保证了最终写入金蝶云星空系统中的信息准确无误。
结合自定义转换逻辑应对格式差异
由于两个系统之间存在不同的数据结构需求,自定义数据转换逻辑是必需品。在此次项目中,这一功能协助我们顺利地处理了吉客 云接口分页及限流问题,实现多个表格字段自动映射,大大降低手工干预成本,提高效率。
通过以上技术要点解析,我们剖析了一套完整、高效、可靠且可追溯性的方案。接下来,将进一步深入探讨具体技术实现细节,包括API调用参数配置、分页机制应对策略以及错误重试机制等内容。
调用吉客云接口wms.stocktake.get获取并加工数据
在轻易云数据集成平台的生命周期中,调用源系统接口是关键的第一步。本文将详细探讨如何通过吉客云的wms.stocktake.get
接口获取盘亏单数据,并进行初步的数据加工。
接口调用配置
根据元数据配置,我们需要通过POST方法调用吉客云的wms.stocktake.get
接口。以下是具体的请求参数配置:
- api:
wms.stocktake.get
- method:
POST
- number:
stocktakeId
- id:
id
- pagination:
pageSize
: 1
- idCheck: true
请求字段包括:
- warehouseCode(仓库编号): 例如 "123456"
- skuBarcode(条码 支持批量查询)
- pageSize(条目): 固定值 "20"
- pageIndex(页码)
- startPdDate(盘点时间-开始): 使用函数
_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 2 DAY),'%Y-%m-%d 00:00:00')
获取两天前的日期 - endPdDate(盘点时间-结束): 使用当前时间
{{CURRENT_TIME|datetime}}
请求示例
以下是一个完整的请求示例,展示了如何构建POST请求以获取所需数据:
{
"warehouseCode": "123456",
"skuBarcode": "",
"pageSize": "20",
"pageIndex": "1",
"startPdDate": "_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 2 DAY),'%Y-%m-%d 00:00:00')",
"endPdDate": "{{CURRENT_TIME|datetime}}"
}
数据清洗与转换
在获取到原始数据后,下一步是进行数据清洗和转换。这一步骤确保数据符合目标系统的要求,并去除冗余信息。以下是一些常见的数据清洗和转换操作:
-
字段映射与重命名 将源系统中的字段映射到目标系统中的字段。例如,将
stocktakeId
映射为目标系统中的id
。 -
数据类型转换 确保所有字段的数据类型符合目标系统的要求。例如,将日期字符串转换为标准日期格式。
-
过滤无效数据 去除重复或无效的数据记录。例如,检查每条记录是否包含有效的仓库编号和条码。
-
分页处理 根据元数据配置中的分页信息,逐页获取并处理数据,以确保完整性和一致性。
实时监控与调试
在轻易云平台上,实时监控和调试功能可以帮助我们快速发现并解决问题。通过可视化界面,我们可以查看每个步骤的数据流动情况,并及时调整配置以优化性能。
实践案例
假设我们需要获取某一仓库在过去两天内的所有盘亏单信息,并将其导入到目标系统中。以下是具体操作步骤:
- 配置API请求参数,如上所述。
- 调用吉客云接口,获取原始盘亏单数据。
- 对原始数据进行清洗和转换,包括字段映射、类型转换和无效数据过滤。
- 将处理后的数据写入目标系统。
通过这些步骤,我们可以高效地完成从源系统到目标系统的数据集成任务,实现不同系统间的数据无缝对接。
轻易云数据集成平台:将源数据ETL转换并写入金蝶云星空API接口
在轻易云数据集成平台中,数据处理的全生命周期管理涵盖了从数据请求与清洗到数据转换与写入的各个环节。本文将重点探讨如何将已经集成的源平台数据进行ETL转换,并转为金蝶云星空API接口所能够接收的格式,最终写入目标平台。
元数据配置解析
在进行ETL转换之前,我们需要详细了解元数据配置。以下是一个典型的元数据配置示例:
{
"api": "batchSave",
"method": "POST",
"idCheck": true,
"operation": {
"rowsKey": "array",
"rows": 1,
"method": "batchArraySave"
},
"request": [
{
"label": "单据编号",
"field": "FBillNo",
"type": "string",
"value": "{stocktakeId}"
},
{
"label": "库存组织",
"field": "FStockOrgId",
...
}
...
],
...
}
数据请求与清洗
在这个阶段,我们从源系统中获取盘亏单的数据,并进行初步清洗和验证。这一步骤确保了我们获得的数据是准确且完整的,为后续的ETL转换打下基础。
数据转换与写入
数据映射与转换
-
单据编号 (FBillNo):
- 源字段:
stocktakeId
- 转换规则:直接映射,无需额外处理。
- 源字段:
-
库存组织 (FStockOrgId):
- 源字段:
warehouseCode
- 转换规则:使用
_findCollection
方法,通过warehouseCode
查找对应的库存组织ID,并使用ConvertObjectParser
进行解析。
- 源字段:
-
日期 (FDate):
- 源字段:
stocktakeDate
- 转换规则:使用函数
FROM_UNIXTIME
将Unix时间戳转换为标准日期格式。
- 源字段:
-
单据类型 (FBillTypeID):
- 固定值:
PK01_SYS
- 转换规则:直接映射,并使用
ConvertObjectParser
进行解析。
- 固定值:
-
部门 (FDeptId):
- 源字段:
warehouseCode
- 转换规则:类似于库存组织,通过查找和解析获取部门ID。
- 源字段:
-
货主类型 (FOwnerTypeIdHead):
- 固定值:
BD_OwnerOrg
- 转换规则:直接映射,无需额外处理。
- 固定值:
-
明细信息 (FBillEntry):
- 包含多个子字段,如物料编码、仓库、盘点数量、账存数量等。
- 每个子字段都有其对应的源字段和转换规则。例如,物料编码通过解析器将物料编号转为目标系统所需格式;盘点数量和账存数量则通过简单的数据类型转换(如取整)。
-
吉客云单据编号 (F_JKBillNo):
- 源字段:
stocktakeId
- 转换规则:直接映射,无需额外处理。
- 源字段:
API调用与写入
在完成所有必要的数据转换后,我们需要将这些数据通过API接口写入金蝶云星空。以下是一个典型的API调用示例:
{
"FormId": "STK_StockCountLoss",
"IsVerifyBaseDataField": false,
"Operation": "Save",
"IsAutoSubmitAndAudit": true,
...
}
FormId
: 指定业务对象表单ID,如盘亏单对应的是STK_StockCountLoss
。IsVerifyBaseDataField
: 设置是否验证基础资料有效性,通常设置为false。Operation
: 执行操作类型,这里为保存操作。IsAutoSubmitAndAudit
: 是否自动提交并审核,设置为true以简化流程。
实际案例应用
在实际应用中,我们可以通过以下步骤实现整个ETL过程:
- 从源系统获取盘亏单数据。
- 根据元数据配置进行字段映射和转换。
- 构建符合金蝶云星空API接口要求的数据结构。
- 使用HTTP POST方法调用金蝶云星空API,将数据提交至目标平台。
例如:
import requests
import json
# 构建请求体
payload = {
# 根据元数据配置构建的数据结构
}
# API调用
response = requests.post(
url="https://api.kingdee.com/batchSave",
headers={"Content-Type": "application/json"},
data=json.dumps(payload)
)
# 检查响应状态
if response.status_code == 200:
print("Data successfully written to Kingdee Cloud")
else:
print(f"Failed to write data: {response.text}")
通过上述步骤,我们可以高效地完成从源系统到目标平台的数据集成,实现业务流程的自动化和优化。