金蝶云星空数据集成到MySQL实战案例:MOM-PUR-采购订单明细状态更新
在本次技术分享中,我们将聚焦于金蝶云星空与MySQL的数据对接集成,以实现MOM-PUR-采购订单明细的状态更新,包括冻结、终止和关闭等操作。为了保障整个数据集成过程高效且可靠,我们采用了轻易云平台提供的一系列特性,如高吞吐量的数据写入能力、实时监控与告警系统,以及自定义数据转换逻辑。
数据获取及处理方案
我们首先需要通过金蝶云星空API接口executeBillQuery
来获取所需的采购订单明细数据。这一步至关重要,因为它直接影响后续的数据处理与映射。这里我们特别注意到了接口的分页和限流问题,确保从源头上每一条记录都被精准抓取,避免任何漏单现象。
# 伪代码示例:调用executeBillQuery获取数据
response = executeBillQuery(api_params)
if response.status_code == 200:
data = response.json()
else:
handle_error(response)
数据格式转换及映射
考虑到金蝶云星空和MySQL之间存在一定的数据格式差异,为适应特定的业务需求,我们使用了轻易云的平台功能,对原始数据进行了必要的清洗和转换:
- 字段匹配:确认两边字段名一致。
- 类型转换:如日期格式、数值精度调整。
- 自定义逻辑:例如在不同状态下执行不同的数据处理流程。
这一过程中,不仅利用可视化设计工具进行了直观管理,还通过集中监控系统实时跟踪转化操作是否顺利进行,并及时报告任何异常情况。
-- 示例 SQL: 在 MySQL 中插入/更新采购订单明细信息
INSERT INTO purchase_order_details (order_id, status, update_time)
VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE
status=VALUES(status), update_time=VALUES(update_time);
大规模批量写入优化
由于涉及大量历史及变化频繁的数据,如何快速、高效地将其写入到MySQL成为关键的一环。在这方面,高吞吐量并发写入能力显得尤为重要,通过批处理方式显著减低网络开销,提高了整体性能。同时具备错误检测与重试机制,当遇到意外失败时能够保证任务继续推进,不影响整体进度。
# 伪代码示例:批量插入 MySQL 的方法
data_chunks = split_data(data) # 将大数据分块以便批量插入
for
![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D11.png~tplv-syqr462i7n-qeasy.image)
### 调用金蝶云星空接口executeBillQuery获取并加工数据
在轻易云数据集成平台的生命周期中,调用源系统接口是数据处理的第一步。本文将深入探讨如何通过调用金蝶云星空的`executeBillQuery`接口获取采购订单明细,并进行初步的数据加工。
#### 接口配置与调用
首先,我们需要配置和调用金蝶云星空的`executeBillQuery`接口。根据提供的元数据配置,接口采用POST方法进行请求,主要参数如下:
- `FormId`: 业务对象表单Id,值为`PUR_PurchaseOrder`
- `FieldKeys`: 需查询的字段key集合
- `FilterString`: 过滤条件
- `Limit`: 最大行数,值为2000
- `StartRow`: 开始行索引
- `TopRowCount`: 返回总行数
具体请求示例如下:
```json
{
"FormId": "PUR_PurchaseOrder",
"FieldKeys": "FBillNo,FPurchaseOrgId.fnumber,FPOOrderEntry_fseq,FMaterialId.fnumber,FUnitId.fnumber,FEntryNote,FQty,FStockInQty,FDeliveryDate,FAllAmount_LC,F_caildj,FGiveAway,F_caizhi,FMaterialDesc,FModel,F_pinpai,F_jgdj,FMRPFreezeStatus,FPOOrderEntry_FEntryID,FModifyDate,FMRPCloseStatus,FMRPTerminateStatus,FMtoNo,FPriceUnitId.fnumber,FSourceBillNo,FTaxPrice,FEntryTaxRate",
"FilterString": "FPurchaseOrgId.fnumber in ('T02','T02.01') and FDocumentStatus in ('C') and (FMRPTerminateStatus='B' or FMRPCloseStatus='B' or FMRPFreezeStatus='B') and fdate>='2024-02-01'",
"Limit": "2000",
"StartRow": "{PAGINATION_START_ROW}",
"TopRowCount": true
}
数据解析与加工
在获取到原始数据后,需要对数据进行解析和加工。以下是主要字段及其对应关系:
fbillno
对应FBillNo
FPurchaseOrgId.fnumber
对应PORG_NUMBER
FPOOrderEntry_fseq
对应LINE_NUM
FMaterialId.fnumber
对应MATERIAL_ID
- ...
我们可以使用轻易云平台提供的自动填充响应功能来简化这一过程。通过设置autoFillResponse: true
,平台会自动将响应中的字段映射到预定义的字段中。
数据清洗与转换
在数据解析完成后,需要对数据进行清洗和转换,以确保其符合目标系统的要求。例如:
- 日期格式转换:将金蝶云返回的日期格式转换为目标系统所需的格式。
- 单位换算:如果物料单位在不同系统中定义不同,需要进行单位换算。
- 状态码映射:将金蝶云中的状态码(如冻结、终止、关闭)映射为目标系统中的状态码。
示例代码如下:
import datetime
def convert_date_format(date_str):
# 假设金蝶云返回的日期格式为YYYY-MM-DD
date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d')
# 转换为目标系统所需的格式,如MM/DD/YYYY
return date_obj.strftime('%m/%d/%Y')
def map_status_code(kingdee_status):
status_mapping = {
'B': '已关闭',
'A': '活动中'
}
return status_mapping.get(kingdee_status, '未知状态')
# 示例数据清洗与转换
raw_data = {
'FDeliveryDate': '2024-02-15',
'FMRPFreezeStatus': 'B'
}
cleaned_data = {
'需要日期': convert_date_format(raw_data['FDeliveryDate']),
'业务冻结状态': map_status_code(raw_data['FMRPFreezeStatus'])
}
数据写入
最后,将清洗和转换后的数据写入目标系统。这一步通常涉及调用目标系统的API接口,将处理好的数据推送过去。在此过程中,需要确保数据格式和字段名称与目标系统一致。
通过上述步骤,我们实现了从金蝶云星空获取采购订单明细,并对其进行初步加工,为后续的数据处理奠定了基础。这一过程展示了轻易云平台在异构系统集成中的强大能力,有效提升了业务透明度和效率。
使用轻易云数据集成平台进行ETL转换并写入MySQLAPI接口
在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台 MySQLAPI 接口所能够接收的格式,最终写入目标平台。本文将详细探讨如何使用轻易云数据集成平台完成这一过程。
元数据配置解析
元数据配置是实现数据转换和写入的关键。以下是我们需要处理的元数据配置:
{
"api": "execute",
"effect": "EXECUTE",
"method": "POST",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "KINGDEE_FREEZE_STATUS",
"type": "object",
"children": [
{"field": "ATTR_VALUE", "label": "扩展属性值", "type": "string", "value": "{{金蝶业务冻结}}"},
{"field": "PO_NUMBER", "label": "采购订单号", "type": "string", "value": "{{采购订单号}}"},
{"field": "TENANT_ID", "label": "租户ID", "type": "int", "value": 7},
{"field": "LINE_NUM", "label": "采购订单行号", "type": "int", "value": "{{采购订单行号}}"},
{"field": "ATTR_NAME", "label": "扩展属性名", "type": "string", "value":"KINGDEE_FREEZE_STATUS"}
]
},
{
...
}
],
...
}
数据请求与清洗
在此阶段,我们从源系统获取原始数据,并进行必要的清洗和标准化处理,以确保数据质量和一致性。具体步骤如下:
- 获取原始数据:通过调用源系统的API接口,获取采购订单明细状态更新的数据。
- 清洗与标准化:对获取的数据进行清洗,包括去除冗余信息、修正格式错误等操作。
数据转换与写入
在完成数据清洗后,我们进入关键的ETL转换阶段,将清洗后的数据转为目标平台 MySQLAPI 接口所能接收的格式,并最终写入目标平台。
-
定义转换规则:根据元数据配置,定义各字段的映射关系。例如:
ATTR_VALUE
映射为{{金蝶业务冻结}}
PO_NUMBER
映射为{{采购订单号}}
TENANT_ID
固定值为7
LINE_NUM
映射为{{采购订单行号}}
ATTR_NAME
固定值为KINGDEE_FREEZE_STATUS
-
构建请求对象:根据定义好的映射关系,构建请求对象。示例如下:
{
main_params: {
ATTR_VALUE: "<实际值>",
PO_NUMBER: "<实际值>",
TENANT_ID: 7,
LINE_NUM: "<实际值>",
ATTR_NAME: 'KINGDEE_FREEZE_STATUS'
},
extend_params_1: {
ATTR_VALUE: "<实际值>",
PO_NUMBER: "<实际值>",
TENANT_ID: 7,
LINE_NUM: "<实际值>",
ATTR_NAME: 'KINGDEE_MRPCLOSE_STATUS'
},
...
}
- 调用MySQLAPI接口:使用POST方法,将构建好的请求对象发送到MySQLAPI接口,实现数据写入。具体代码示例如下:
import requests
url = 'https://api.example.com/execute'
headers = {'Content-Type': 'application/json'}
data = {
# 构建好的请求对象
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
print("Data successfully written to MySQL.")
else:
print("Failed to write data to MySQL:", response.text)
SQL更新语句
在某些情况下,我们可能需要直接执行SQL更新语句来更新数据库中的记录。根据元数据配置,我们可以生成相应的SQL语句。例如:
update ty_mes.mt_po_line_attr
set ATTR_VALUE=:ATTR_VALUE
WHERE PO_LINE_ID=(select po_line_id from ty_mes.mt_po_line where po_header_id=(select po_header_id from ty_mes.mt_po_header WHERE PO_NUMBER=:PO_NUMBER and TENANT_ID=:TENANT_ID) and LINE_NUM=:LINE_NUM) AND ATTR_NAME=:ATTR_NAME;
通过执行上述SQL语句,可以直接更新数据库中的相应记录。
总结
通过上述步骤,我们可以高效地将源平台的数据进行ETL转换,并成功写入目标平台 MySQLAPI 接口。这一过程不仅确保了数据的一致性和准确性,还提升了系统集成的效率和透明度。在实际操作中,根据具体需求调整元数据配置和转换规则,可以灵活应对各种复杂的数据集成场景。