系统对接集成案例:旺店通·旗舰奇门数据集成到MySQL
在构建高效的数据处理和管理系统中,数据的快速、高效、可靠传输是关键。本文将详细介绍如何利用轻易云数据集成平台,将“旺店通·旗舰版-采购入库单”通过API接口(wdt.wms.stockin.purchase.querywithdetail)解决方案,实现从原始查询到BI泰海系统的MySQL数据库表的无缝对接。
技术实现概述
-
接口调用与分页处理 使用
wdt.wms.stockin.purchase.querywithdetail
API获取采购入库单,我们在设计时需要特别注意分页和限流问题,以确保每页的数据都被完整、准确地抓取而不漏单。在实际操作中,通过添加批量任务并调节接口请求频率,有效降低了API调用异常风险。 -
实时监控与性能优化 为保障整个数据传输过程的稳定性,我们引入了集中监控和告警系统,实时跟踪各个节点的数据流动状态。同时,对采集端进行性能调优,提高吞吐量,使大量数据能以最快速度写入MySQL数据库,从而提升整体工作效率。
-
自定义转换逻辑与格式差异处理 面临不同系统之间的数据格式差异,通过轻易云平台灵活强大的自定义转换功能,可以将源数据结构调整为目标数据库所需格式。这一步骤不仅简化了后续处理操作,也为进一步分析提供了可靠基础。
-
异常检测与重试机制 在多线程、大批量操作情况下,不可避免会出现一些异常情况。我们设计了一套错误重试机制,如遇网络或API响应问题,可以自动重新尝试相关任务,确保所有有效记录最终成功传递至MySQL。此外,还设定报警阈值,当错误次数过高时及时通知管理员干预。
-
日志记录和透明化管理 数据处理过程中,引入全流程日志记录,帮助定位及解决可能存在的问题,并且通过可视化工具展示每一个环节的进度,让业务方明确了解当前汇总结果及其背后的运行细节,大大提高业务透明度及决策准确性。
通过这次配置实施,我们成功实现了从“旺店通·旗舰奇门”到“MySQL”的高效、安全、有序地同步,为企业全面掌握采购入库单信息及更好发挥BI报表功能奠定坚实基础。下一步,我们将分步骤描述具体部署过程,包括环境准备、参数设置以及代码实例等内容。
调用旺店通·旗舰奇门接口wdt.wms.stockin.purchase.querywithdetail获取并加工数据
在数据集成生命周期的第一步,我们需要调用源系统的API接口以获取原始数据,并对其进行初步加工。本文将详细探讨如何使用轻易云数据集成平台调用旺店通·旗舰奇门接口wdt.wms.stockin.purchase.querywithdetail
,并对返回的数据进行处理。
接口调用配置
首先,我们需要配置接口的元数据。以下是该接口的元数据配置:
{
"api": "wdt.wms.stockin.purchase.querywithdetail",
"effect": "QUERY",
"method": "POST",
"number": "order_no",
"id": "stockin_id",
"name": "tid",
"request": [
{
"field": "pager",
"label": "分页参数",
"type": "object",
"describe": "分页参数",
"children": [
{
"field": "page_size",
"label": "分页大小",
"type": "string",
"describe": "分页大小",
"value": "50"
},
{
"field": "page_no",
"label": "页号",
"type": "string",
"describe": "页号",
"value": "1"
}
]
},
{
"field": "params",
"label": "业务参数",
"type": "object",
"describe": "业务参数",
"children": [
{
"field": "start_time",
"label": "开始时间",
``type``: ``string``,
``describe``: ``开始时间``,
``value``: ``{{LAST_SYNC_TIME|datetime}}``
},
{
``field``: ``end_time``,
``label``: ``结束时间``,
``type``: ``string``,
``describe``: ``结束时间``,
``value``: ``{{CURRENT_TIME|datetime}}``
}
]
}
],
``autoFillResponse``: true,
``beatFlat``: [``details_list``]
}
请求参数解析
在请求参数中,我们主要关注两个部分:分页参数和业务参数。
-
分页参数:
page_size
: 每页返回的数据条数,默认设置为50。page_no
: 当前请求的页码,默认设置为1。
-
业务参数:
start_time
: 数据查询的起始时间,使用占位符{{LAST_SYNC_TIME|datetime}}
动态填充。end_time
: 数据查询的结束时间,使用占位符{{CURRENT_TIME|datetime}}
动态填充。
这些参数确保了我们可以灵活地控制数据查询的范围和分页,从而高效地获取所需数据。
数据获取与初步加工
在成功调用接口并获取到原始数据后,我们需要对返回的数据进行初步加工。由于接口返回的数据可能包含嵌套结构,因此我们需要将其扁平化处理,以便后续的数据转换和写入操作。
例如,接口返回的数据可能包含一个名为details_list
的嵌套字段。通过配置中的beatFlat
属性,我们可以自动将该字段扁平化处理,使得每条记录都能独立存在于最终的数据集中。
实践案例
假设我们需要从2024年1月1日起至今的采购入库单数据,可以通过以下步骤实现:
-
设置请求参数:
{ ``pager``: { ``page_size``: ``50``, ``page_no``: ``1`` }, ``params``: { ``start_time``: ``2024-01-01T00:00:00Z``, ``end_time``: `CURRENT_TIMESTAMP` } }
-
调用API: 使用POST方法发送上述请求至接口地址,并获取响应数据。
-
处理响应数据: 对返回的数据进行扁平化处理,将嵌套字段如
details_list
展开为独立记录。
通过上述步骤,我们能够高效地从旺店通·旗舰奇门系统中获取采购入库单数据,并为后续的数据转换和写入做好准备。这一过程展示了如何利用轻易云数据集成平台的强大功能,实现复杂系统间的数据无缝对接。
使用轻易云数据集成平台进行ETL转换并写入MySQL
在数据集成生命周期的第二步,我们将重点探讨如何使用轻易云数据集成平台将源平台的数据进行ETL(提取、转换、加载)转换,并最终写入目标平台MySQL。本文将深入探讨系统接口和数据集成的技术细节。
数据提取与清洗
首先,从源平台(旺店通旗舰版)提取采购入库单的数据。这一步骤涉及到对原始数据的获取和初步清洗,以确保数据质量和一致性。由于本文重点在于ETL转换,因此不再详细描述数据提取过程。
数据转换与写入
接下来,进入ETL生命周期中的关键步骤:数据转换与写入。我们需要将已经清洗好的数据转化为目标平台MySQL能够接收的格式,并通过API接口将其写入数据库。
元数据配置解析
以下是元数据配置,用于定义如何将源平台的数据字段映射到目标MySQL数据库表中:
{
"api": "batchexecute",
"effect": "EXECUTE",
"method": "SQL",
"number": "id",
"id": "id",
"name": "id",
"idCheck": true,
"request": [
{"field":"stockin_id","label":"入库单id","type":"string","value":"{stockin_id}"},
{"field":"order_no","label":"入库单号","type":"string","value":"{order_no}"},
{"field":"warehouse_no","label":"仓库编号","type":"string","value":"{warehouse_no}"},
{"field":"status","label":"状态","type":"string","value":"{status}"},
{"field":"modified","label":"修改时间","type":"string","value":"{{modified|datetime}}"},
{"field":"created_time","label":"制单时间","type":"string","value":"{{created_time|datetime}}"},
{"field":"remark","label":"备注","type":"string","value":"{remark}"},
{"field":"logistics_type_name","label":"入库单物流类型","type":"string","value":"{logistics_type_name}"},
{"field":"check_time","label":"审核时间","type":"string","value":"{{check_time|datetime}}"},
{"field":"purchase_id","label":"采购单id","type":"string","value":"{purchase_id}"},
{"field":"purchase_no","label":"采购单号","type":"string","value":"{purchase_no}"},
{"field":"goods_count","label":"货品数量","type":"string","value":"{goods_count}"},
{"field":"provider_no","label":"供应商编号","type\":\"string\",\"value\":\"{provider_no}\"},
{"field\":\"provider_name\",\"label\":\"供应商名称\",\"type\":\"string\",\"value\":\"{provider_name}\"},
{\"field\":\"logistics_no\",\"label\":\"物流单号\",\"type\":\"string\",\"value\":\"{logistics_no}\"},
{\"field\":\"logistics_name\",\"label\":\"物流公司\",\"type\":\"string\",\"value\":\"{logistics_name}\"},
{\"field\":\"goods_amount\",\"label\":\"货品总价格,不包含优惠\",\"type\":\"string\",\"value\":\"{goods_amount}\"},
{\"field\":\"total_price\",\"label\":\"税前总货款(折后)\",\"type\":\"string\",\"value\":\"{total_price}\"},
{\"field\":\"tax_amount\",\"label\":\"税后总额(折后)\",\"type\":\"string\",\"value\":\"{tax_amount}\"},
{\"field\":\"total_stockin_price\",\"label\":\"入库总金额\",\"type\":\”string\”,\”value\”:\”{total_stockin_price}\”},
{\"field\":\”flag_name\”,\”label\”: \”标记名称\”, \”type\”: \”string\”, \”value\”: \”{flag_name}\”},
{\"field\": \”operator_name\", \”label\": \”经办人\", \”type\": \”string\", \”value\": \"{operator_name}\"},
{\"field\": \"details_list_rec_id\", \"label\": \"入库单明细id\", \"type\": \"string\", \"value\": \"{details_list_rec_id}\"},
{\"field\": \"details_list_num\", \"label\": \"数量\", \"type\": \"string\", \"value\": \"{details_list_num}\"},
{\"field\": \"details_list_discount\", \"label\": \"折扣\", \"type\": \"string\", \"value\": \"{goods_amount}\"},
{\"field\": \"details_list_cost_price\", \"label\": \"税前单价\", \"type\": “ string”, “ value”: “ {details_list_cost_price}\ ” },
{\" field”: “ details_list_src_price”, “ label”: “ 税前折后单价”, “ type”: “ string”, “ value”: “ {details_list_src_price}\ ” },
{\" field”: “ details_list_tax_price”, “ label”: “ 税后单价”, “ type”: “ string”, “ value”: “ {details_list_tax_price}\ ” },
{\" field”: “ details_list_tax_amount”, “ label”: “ 税后金额(税后总价)”,“ type”:" string "," value ":" {details_list_tax_amount}" },
{\" field ":“ details_list_tax ",“ label ":“ 税率 ",“ type ":“ string ",“ value ":“ {details_list_tax}" },
{\" field ":“ details_list_total_cost ",“ label ":“ 税前金额(税前总价)",“ type ":" string "," value ":" {details_list_total_cost}" },
{\" field ":“ details_list_remark ",“ label ":“ 入库单明细备注 ",“ type ":" string "," value ":" {details_list_remark}" },
{\" field ":“ details_list_goods_name ",“ label ":“ 货品名称 ",“ type ":" string "," value ":" {details_list_goods_name}" },
{\" field ":“ details_list_goods_no ",“ label ":“ 货品编号 ",“ type ":" string "," value ":" {details_list_goods_no}" },
{\" field ":“ details_list_spec_no ",“ label ":“ 商家编码 ",“ type ":" string "," value ":" {details_list_spec_no}" },
{\" field ":“ details_list_spec_code ",“ label ":‘规格码’,‘类型’:‘字符串’,‘值’:‘{详细列表规格代码}’}
{\" field ': ‘详细列表道具1’, ‘标签’:‘采购明细自定义属性1’, ‘类型’:‘字符串’, ‘值’:‘{详细列表道具1}’}
{\" 字段': ‘详细列表道具2’, ‘标签’:‘采购明细自定义属性2’, ‘类型’:‘字符串’, ‘值’:‘{详细列表道具2}’}
{\" 字段': ‘详细列表道具3’, ‘标签’:‘采购明细自定义属性3’, ‘类型’:‘字符串’, ‘值’:‘{详细列表道具3}’}
{\" 字段': ‘详细列表道具4’, ‘标签’:‘采购明细自定义属性4’, ‘类型’:‘字符串’, ‘值’:‘{详细列表道具4}’}
{\" 字段': '详情_列表_规格_名称', '标签':'规格名称', '类型':'字符串', '值':'{详情_列表_规格_名称}'}
{\" 字段': '详情_列表_品牌_名称', '标签':'品牌名称', '类型':'字符串', '值':'{详情_列表_品牌_名称}'}
{\" 字段': '详情_列表_单位_名称', '标签':'单位', '类型':'字符串', '值':'{详情_列表_单位_名称}'}
{\" 字段': '详情_列表批次号', '标签':'批次', '类型':'字符串', '值':'{详情批次号}'}
{\" 字段': '详情批次有效期', '标签':'有效期', '类型':'字符串', '值':'{{有效期|日期时间}}'}
{\" 字段': '生产日期', '标签':'生产日期', '类型':'字符串', '值':'{{生产日期|日期时间}}'}
{\" 字段': '位置编号', '标签':'货位', '类型':'字符串', '值':'位置编号'}
{\" 字段': 是否缺陷, 标签: 是否缺陷, 类型: 字符串, 值: 是否缺陷 }
],
"otherRequest":[
{
"字段":主sql,
标签: 主语句,
类型: 字符串,
描述: SQL首次执行的语句,将会返回:lastInsertId,
值: REPLACE INTO wdt_wms_stockin_purchase_querywithdetail (stockin_id,order_no,warehouse_no,status,modified,created_time,remark,logistics_type_name,check_time,purchase_id,purchase_no,goods_count,provider_no,provider_name,logistics_no,logistics_name,goods_amount,total_price,tax_amount,total_stockin_price,flag_name,operator_name,details_rec_id,num_discount,cost_price_src_tax_total_cost_remark_goods_goods_spec_code_prop1_prop2_prop3_prop4_spec_brand_unit_batch_expire_production_position_defect_unit_ratio_purchase_unit_stockin) VALUES
},
{
字段: limit,
标签: limit,
类型: 字符串,
值:100
}
]
}
在这个配置中,每个request
字段都定义了一个从源平台到目标MySQL表的映射关系。例如:
{"字段":"stockin_id","标签":"入库单id","类型":"字符串","值":"stockin_id"}
将源平台的stockin_id
映射到目标表中的stockin_id
字段。{"字段":"modified","标签":"修改时间","类型":"字符串","值":"{{modified|datetime}}"}
将源平台的modified
字段经过日期时间格式化后映射到目标表中的modified
字段。
SQL主语句配置
主语句用于执行实际的数据插入操作:
{
"字段":主sql,
标签: 主语句,
类型: 字符串,
描述: SQL首次执行的语句,将会返回:lastInsertId,
值: REPLACE INTO wdt_wms_stockin_purchase_querywithdetail (stockin_id,order_no,warehouse_no,status,modified,created_time,remark,logistics_type_name,check_time,purchase_id,purchase_no,goods_count,provider_no,provider_name,logistics_no,logistics_name,goods_amount,total_price,tax_amount,total_stockin_price) VALUES
}
该主语句使用了REPLACE INTO语法,将所有映射后的字段插入到目标表中。如果记录已经存在,则替换现有记录。
数据写入过程
通过调用API接口 batchexecute
并传递上述配置,我们可以实现批量执行SQL插入操作。API请求示例如下:
{
api:"batchexecute",
effect:"EXECUTE",
method:"SQL",
number:"id",
id:"id",
name:"id",
idCheck:true,
request:[
// 映射后的具体请求内容...
],
otherRequest:[
{
field:"main_sql",
label:"主语句",
type:"string",
describe:"SQL首次执行的语句,将会返回:lastInsertId",
value:"REPLACE INTO wdt_wms_stockin_purchase_querywithdetail (stockin_id,..."
},
{
field:"limit",
label:"limit",
type:"string",
value:"100"
}
]
}
通过上述配置和API调用,可以高效地将源平台的数据转换并写入到目标MySQL数据库中,实现不同系统间的数据无缝对接。
总结
本文深入探讨了如何使用轻易云数据集成平台进行ETL转换,并通过API接口将处理后的数据写入目标MySQL数据库。在实际操作中,通过精确配置元数据和合理设计API请求,可以高效完成复杂的数据集成任务,提高业务透明度和效率。