旺店通企业奇门数据集成到用友BIP:采购入库单系统对接方案
在数据密集型业务中,如何实现跨平台的数据无缝对接一直是一个具有挑战性的技术难题。本文将聚焦于通过轻易云数据集成平台,将旺店通·企业奇门中的采购入库单数据高效且稳定地写入到用友BIP系统的具体实施案例。在这个实例中,我们采用了 wdt.stockin.order.query.purchase
API 来获取旺店通·企业奇门系统中的数据,并使用 /yonbip/scm/purinrecord/mergeSourceData/save
API 实现向用友BIP的写入操作。
集成案例背景
本次项目需求主要涉及以下几个关键步骤:
- API调用和分页处理:为确保从旺店通·企业奇门接口获取的数据不漏单,通过
wdt.stockin.order.query.purchase
API 进行定时抓取,并处理分页和限流问题。 - 数据格式转换:由于两套系统间的数据结构差异,需要针对不同字段进行准确的映射及格式转换,以确保传输后能被正确解析。
- 批量高效写入:大量的数据需要快速、稳定地写入到用友BIP,这要求我们设计一套可靠的批量提交机制。
- 实时监控与日志记录:为了保证整个流程透明可追溯,必须实现每个步骤的实时监控以及详细日志记录。
技术细节分享
首先,在实际实施过程中,有必要特别关注以下技术环节:
-
接口调用频率控制与异常处理:
- 调用于
wdt.stockin.order.query.purchase
接口时,由于其存在请求速率限制(Rate Limiting),我们引入了限流算法来优化请求频次。同时,为提升业务连续性,实现接口调用失败后的重试机制,这是确保集成过程稳健的重要手段之一。
- 调用于
-
批量快速写入策略:
- 将从旺店通获取的大量订单信息经过统一格式化后,再利用并发多线程方式,通过
/yonbip/scm/purinrecord/mergeSourceData/save
接口批量上传至用友BIP。此举不仅加快了传输速度,同时也降低了网络负载,使整体效率显著提高。
- 将从旺店通获取的大量订单信息经过统一格式化后,再利用并发多线程方式,通过
-
自动化任务调度与管理:
- 为保障定时抓取功能可靠运行,采用了一系列任务调度工具来设定固定时间周期执行采集动作。这些工具不仅支持复杂条件判断,还能提供灵活的错误重试及通知报警功能,大幅提升了任务运维质量。
以上是该系统对接的一部分关键技术
调用旺店通·企业奇门接口获取并加工数据的技术案例
在数据集成过程中,调用源系统接口是关键的一步。本文将详细探讨如何通过轻易云数据集成平台调用旺店通·企业奇门接口 wdt.stockin.order.query.purchase
获取采购入库单数据,并进行初步加工。
接口调用配置
首先,我们需要配置接口调用的元数据。根据提供的元数据配置,接口调用采用POST方法,主要参数如下:
- api:
wdt.stockin.order.query.purchase
- method:
POST
- number:
order_no
- id:
order_no
- pagination: 每页返回50条记录
- idCheck: true
请求参数包括:
- start_time: 增量获取数据的开始时间,格式为
yyyy-MM-dd HH:mm:ss
。 - end_time: 增量获取数据的结束时间,格式为
yyyy-MM-dd HH:mm:ss
。 - status: 入库单状态,默认值为80(已完成)。
- src_order_no: 上层单据编号,可选。
- warehouse_no: 仓库编号,用于区分不同仓库的数据。
分页参数:
- page_size: 每页返回的数据条数,默认为50。
- page_no: 页号,从0开始。
数据请求与清洗
在实际操作中,我们需要通过API请求获取采购入库单数据,并对其进行初步清洗。以下是具体步骤:
-
构建请求参数
根据元数据配置和业务需求,构建API请求参数。例如:
{ "start_time": "2023-01-01 00:00:00", "end_time": "2023-01-31 23:59:59", "status": "80", "warehouse_no": "WH001", "page_size": 50, "page_no": 0 }
-
发送API请求
使用HTTP POST方法发送请求至
wdt.stockin.order.query.purchase
接口,并接收响应数据。示例代码如下:import requests import json url = "https://api.wangdian.cn/openapi2/wdt.stockin.order.query.purchase" headers = {"Content-Type": "application/json"} payload = { "start_time": "2023-01-01 00:00:00", "end_time": "2023-01-31 23:59:59", "status": "80", "warehouse_no": "WH001", "page_size": 50, "page_no": 0 } response = requests.post(url, headers=headers, data=json.dumps(payload)) data = response.json()
-
处理响应数据
对返回的数据进行初步清洗和处理。例如,将时间字段转换为标准格式、过滤无效记录等。示例代码如下:
from datetime import datetime def clean_data(data): cleaned_data = [] for record in data['orders']: # 转换时间格式 record['create_time'] = datetime.strptime(record['create_time'], '%Y-%m-%d %H:%M:%S') # 过滤无效记录(例如状态不为80的记录) if record['status'] == '80': cleaned_data.append(record) return cleaned_data cleaned_data = clean_data(data)
数据转换与写入
在完成初步清洗后,可以将数据进一步转换并写入目标系统。这一步通常涉及到字段映射、格式转换等操作。在轻易云平台上,这些操作可以通过可视化界面配置完成。
例如,将采购入库单中的字段映射到目标系统的相应字段,并确保数据格式符合目标系统要求。
def transform_and_write(cleaned_data):
transformed_data = []
for record in cleaned_data:
transformed_record = {
'order_id': record['order_no'],
'warehouse': record['warehouse_no'],
'created_at': record['create_time'].strftime('%Y-%m-%d %H:%M:%S'),
# 更多字段映射...
}
transformed_data.append(transformed_record)
# 将转换后的数据写入目标系统(示例代码)
write_to_target_system(transformed_data)
def write_to_target_system(data):
# 示例:将数据写入数据库或其他存储系统
pass
transform_and_write(cleaned_data)
通过上述步骤,我们实现了从旺店通·企业奇门接口获取采购入库单数据,并对其进行初步清洗和转换,为后续的数据处理和分析奠定基础。在实际应用中,可以根据具体业务需求进一步优化和扩展这些操作。
轻易云数据集成平台生命周期的第二步:将源平台数据转换并写入用友BIPAPI接口
在轻易云数据集成平台中,数据生命周期的第二步是将已经集成的源平台数据进行ETL转换,并最终写入目标平台。在本文中,我们将重点探讨如何将采购入库单的数据转换为用友BIPAPI接口所能接收的格式,并通过API接口将其写入目标平台。
数据请求与清洗
首先,我们需要确保从源系统获取的数据是完整且准确的。这一步包括数据请求、清洗和初步处理。假设我们已经完成了这些步骤,现在我们进入数据转换与写入阶段。
数据转换与写入
在这个阶段,我们使用轻易云提供的元数据配置来进行ETL转换。以下是具体的元数据配置和如何应用这些配置来实现数据的转换与写入。
{
"api": "/yonbip/scm/purinrecord/mergeSourceData/save",
"method": "POST",
"idCheck": true,
"BIPAudit": "/yonbip/scm/purinrecord/batchaudit",
"request": [
{
"field": "resubmitCheckKey",
"label": "保证请求的幂等性",
"type": "string",
"describe": "该值由客户端生成,并且必须是全局唯一的长度不能超过32位。更多信息,请参见«MDD幂等性»",
"value": "{order_no}"
},
{
"field": "mergeSourceData",
"label": "合并上游数据",
"type": "string",
"describe": "固定值传入true。",
"value": "true"
},
{
"field": "needCalcLines",
"label": "表体行计算标识",
"type": "string",
"describe": "固定值传入true。",
"value": "true"
},
{
"field": "calcLinesKey",
"label": "表体行计算依据",
"type": "string",
"describe": "固定值传入true。",
"value": "{oriTaxUnitPrice}"
},
{
...
以上JSON片段展示了部分关键字段及其描述。在实际操作中,我们需要根据业务需求填充这些字段。以下是具体字段的详细解析:
- resubmitCheckKey:用于保证请求的幂等性,必须是全局唯一。例如,可以使用订单号
{order_no}
作为该值。 - mergeSourceData:固定传入
true
,表示合并上游数据。 - needCalcLines:固定传入
true
,表示需要计算表体行。 - calcLinesKey:用于表体行计算依据,这里指定为含税单价
{oriTaxUnitPrice}
。
表头参数配置
接下来,我们配置表头参数:
{
...
{
...
{
...
{
...
{
...
{
...
{
...
{
...
{
...
{
...
{
...
{
...
{
...
}
}
}
}
}
}
}
}
}
}
}
- code:单据编号,根据系统编码规则配置。例如,使用订单号
{order_no}
。 - vouchdate:单据日期,格式为YYYYMMDD。例如,可以使用检查时间
{{check_time|date}}
。 - bustype:交易类型ID或代码。例如,这里指定为
A15001
。 - warehouse:仓库ID或代码。这里可以通过查询上游单据获取,例如:
_findCollection find purchaseOrders_warehouse_code from aaafe9d3-cba3-367f-8b42-1dbd870e0564 where code={src_order_no} product_cCode={{details_list.goods_no}}
表体参数配置
最后,我们配置表体参数:
{
...
{
...
{"field":"purInRecords","label":"采购入库单子表","type":"array","children":[{"field":"makeRuleCode","label":"生单规则编码","type":"string","describe":"固定值。订单入库:st_purchaseorder订单退库:st_purchaseorder3,退货入库:st_purchaseorder_return,到货入库:pu_arrivalorder","value":"st_purchaseorder","parent":"purInRecords"},{"field":"sourceid","label":"上游单据主表id","type":"string","value":"_findCollection find new_id from aaafe9d3-cba3-367f-8b42-1dbd870e0564 where code={src_order_no} product_cCode={{details_list.goods_no}}","parent":"purInRecords"},{"field":"sourceautoid","label":"上游单据子表id","type":"string","value":"_findCollection find new_purchaseOrders_id from aaafe9d3-cba3-367f-8b42-1dbd870e0564 where code={src_order_no} product_cCode={{details_list.goods_no}}","parent":"purInRecords"},{"field":"qty","label":"数量","type":"string","describe":"默认上游带入,若传入数量请确保表体行计算标识needCalcLines=true,并确认calcLinesKey是否符合实际业务场景","value":"{{details_list.right_num}}","parent":"purInRecords"},{"field":"oriTaxUnitPrice","label":"含税单价","type":"string","describe":"仅在表头参数表体行计算标识needCalcLines=true,且calcLinesKey指定当前字段为计算依据时生效。","value":"_findCollection find oriTaxUnitPrice from aaafe9d3-cba3-367f-8b42-1dbd870e0564 where code={src_order_no} product_cCode={{details_list.goods_no}}","parent":"purInRecords"},{"field":"memo","label":"备注","type":"string","value":"{{details_list.remark}}"}],"value":["details_list"]}
}
- makeRuleCode:生单规则编码,例如订单入库可以设置为
st_purchaseorder
。 - sourceid和sourceautoid:分别为上游单据主表和子表ID,通过查询获取:
_findCollection find new_id from aaafe9d3-cba3-367f-8b42-1dbd870e0564 where code={src_order_no} product_cCode={{details_list.goods_no}}
最终提交
完成所有字段配置后,通过POST方法将处理后的数据提交到用友BIPAPI接口:
POST /yonbip/scm/purinrecord/mergeSourceData/save HTTP/1.1
Host: api.yonyou.com
Content-Type: application/json
Authorization: Bearer {access_token}
{
// 配置好的JSON请求体
}
通过上述步骤,我们实现了从源系统到目标系统的数据ETL转换,并成功写入用友BIPAPI接口。这一过程不仅确保了数据的一致性和准确性,还提高了业务流程的自动化程度。