案例分享:金蝶物料=>互客商品的数据集成
在本篇技术案例中,我们将具体讨论如何实现金蝶云星空的物料数据无缝集成到网易互客商品系统。在整个过程中,关键API接口包括从金蝶云星空获取数据的executeBillQuery
和向网易互客写入数据的openapi/goods/createGoods
。我们将深入探讨多项技术细节,如分页与限流处理、异步任务调度及异常处理机制。
为了确保每一步都高效且不漏单,我们采用了一系列最佳实践。例如,通过定时可靠的抓取任务来调用金蝶云星空接口,并利用批量集成技术快速写入大量数据到网易互客。此外,为了应对不同系统之间的数据格式差异,我们设计了灵活且高效的数据映射方式。同时,在面对潜在的网络波动或服务器错误时,实现了一整套完善的错误重试机制,从而保证数据对接过程稳定可控。
首要步骤是通过金蝶云星空提供的executeBillQuery
接口精准提取所需物料信息。这一操作需要考虑分页以及限流策略,以防止API调用次数过载。之后,这些原始数据信息将经过清洗与转换,以符合网易互客系统所要求的数据格式标准,然后使用其提供创建商品信息的 openapi/goods/createGoods
接口完成数据写入。
针对实时监控与日志记录需求,每一个操作环节都会生成详细日志,并可以通过轻易平台可视化界面进行全程追踪。这不仅提升了透明度,也为后续问题排查提供了有效依据。
调用金蝶云星空接口executeBillQuery获取并加工数据
在数据集成过程中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery
接口来获取并加工物料数据。
配置元数据
首先,我们需要配置元数据以便正确调用接口。以下是我们使用的元数据配置:
{
"api": "executeBillQuery",
"method": "POST",
"number": "FNumber",
"id": "FMATERIALID",
"pagination": {
"pageSize": 100
},
"idCheck": true,
"request": [
{"field":"FMATERIALID","label":"实体主键","type":"string","value":"FMATERIALID"},
{"field":"FNumber","label":"编码","type":"string","value":"FNumber"},
{"field":"FName","label":"名称","type":"string","value":"FName"},
{"field":"FSpecification","label":"规格型号","type":"string","value":"FSpecification"},
{"field":"FOldNumber","label":"旧物料编码","type":"string","value":"FOldNumber"},
{"field":"FBARCODE","label":"条码","type":"string","value":"FBARCODE"},
{"field":"FDescription","label":"描述","type":"string","value":"FDescription"},
{"field":"FMaterialGroup_FNumber","label":"物料分组","type":"string","value":"FMaterialGroup.FNumber"},
{"field":"FErpClsID","label":"物料属性","type":"string","value":"FErpClsID"},
{"field":"FDocumentStatus","label":"数据状态","type":"string","value":"FDocumentStatus"},
{"field":"FForbidStatus","label":"禁用状态","type":"string","value":"FForbidStatus"},
{"field":"FBaseUnitId_FNumber","label":"基本单位.编码","type":"string","value":"FBaseUnitId.FNumber"},
{"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FApproveDate>='{{LAST_SYNC_TIME|dateTime}}'"}
],
"otherRequest":[
{"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"},
{"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"},
{"field": "TopRowCount", "label": "返回总行数", "type": int, describe: 金蝶的查询分页参数},
{"field": FieldKeys, label: 需查询的字段key集合, type: array, describe: 金蝶分录主键ID格式: FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber, parser: {name: ArrayToString, params: ,}},
{ field: FormId, label: 业务对象表单Id, type: string, describe: 必须填写金蝶的表单ID如:PUR_PurchaseOrder,value:"BD_MATERIAL"}
]
}
调用API接口
根据上述元数据配置,我们可以通过POST请求调用executeBillQuery
接口。以下是一个示例请求体:
{
"_api_": "/k3cloud/Kingdee.BOS.WebApi.ServicesStub.DynamicFormService.ExecuteBillQuery.common.kdsvc",
"_method_": POST,
"_params_":{
FormId:"BD_MATERIAL",
FilterString:"FApproveDate>='2023-01-01'",
FieldKeys:"FMATERIALID,FNumber,FName,FSpecification,FBaseUnitId.FNumber"
Limit:"100",
StartRow:"0"
}
}
数据清洗与转换
在获取到原始数据后,需要对其进行清洗和转换,以确保符合目标系统的要求。例如,我们可以将字段名进行映射,将不需要的数据字段剔除,并对某些字段进行格式化处理。
以下是一个简单的数据清洗与转换示例:
def clean_and_transform(data):
cleaned_data = []
for item in data:
transformed_item = {
'material_id': item['FMATERIALID'],
'number': item['FNumber'],
'name': item['FName'],
'specification': item['FSpecification'],
'base_unit': item['FBaseUnitId.FNumber']
}
cleaned_data.append(transformed_item)
return cleaned_data
分页处理
由于每次请求只能返回有限数量的数据,因此需要进行分页处理。我们可以通过调整StartRow
和Limit
参数来实现分页。
以下是一个简单的分页处理逻辑:
def fetch_all_data():
all_data = []
start_row = 0
page_size = 100
while True:
response = call_api(start_row=start_row, limit=page_size)
data = response['data']
if not data:
break
all_data.extend(data)
start_row += page_size
return all_data
通过上述步骤,我们可以高效地从金蝶云星空系统中获取并加工所需的数据,为后续的数据写入和集成打下坚实基础。
轻易云数据集成平台ETL转换与写入网易互客API接口技术案例
在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并最终写入目标平台——网易互客API接口。以下是详细的技术步骤和配置示例。
1. 数据提取与初步清洗
首先,我们从金蝶物料系统中提取原始数据。假设我们已经通过轻易云平台完成了数据请求与初步清洗,接下来我们将重点讨论如何将这些数据转换为网易互客API接口所能接收的格式。
2. 数据转换
根据提供的元数据配置,我们需要将金蝶物料系统中的字段映射到网易互客API接口所需的字段。以下是具体的字段映射关系:
FName
映射到title
和body
F_BCYS_SalePrice
映射到price
FNumber
映射到outerId
- 固定值 "6e12eddb23f44dbcb071447d24d16e65" 映射到
creator
3. 配置元数据
根据元数据配置,构建请求体。以下是一个示例请求体:
{
"title": "{FName}",
"price": "{F_BCYS_SalePrice}",
"outerId": "{FNumber}",
"body": "{FName}",
"storeNum": "",
"creator": "6e12eddb23f44dbcb071447d24d16e65"
}
4. 数据写入目标平台
使用HTTP POST方法,将转换后的数据写入网易互客API接口。具体实现如下:
import requests
import json
# 定义API端点和请求头
api_url = "https://api.example.com/openapi/goods/createGoods"
headers = {
'Content-Type': 'application/json'
}
# 构建请求体
data = {
"title": source_data['FName'],
"price": source_data['F_BCYS_SalePrice'],
"outerId": source_data['FNumber'],
"body": source_data['FName'],
"storeNum": "", # 假设库存信息需要另行处理
"creator": "6e12eddb23f44dbcb071447d24d16e65"
}
# 转换为JSON格式
payload = json.dumps(data)
# 发起POST请求
response = requests.post(api_url, headers=headers, data=payload)
# 检查响应状态码
if response.status_code == 200:
print("Data successfully written to 网易互客")
else:
print(f"Failed to write data: {response.status_code}, {response.text}")
5. 实时监控与错误处理
在实际操作中,实时监控和错误处理至关重要。确保每个环节都能正确执行,并在出现问题时及时采取措施。例如,可以设置日志记录和告警机制,以便快速定位并解决问题。
import logging
# 设置日志记录
logging.basicConfig(filename='integration.log', level=logging.INFO)
try:
response = requests.post(api_url, headers=headers, data=payload)
response.raise_for_status()
logging.info("Data successfully written to 网易互客")
except requests.exceptions.HTTPError as err:
logging.error(f"HTTP error occurred: {err}")
except Exception as err:
logging.error(f"Other error occurred: {err}")
通过以上步骤,我们实现了从金蝶物料系统到网易互客平台的数据ETL转换和写入。这不仅提高了数据处理效率,还确保了各个环节的透明度和可追溯性。