案例分享:聚水潭·奇门数据集成到MySQL
在实际的系统对接需求中,如何高效、准确地将聚水潭·奇门平台中的售后单数据集成到MySQL数据库,成为不少企业亟需解决的问题。本文将以“聚水潭-售后单-->BI崛起-售后表【历史订单】”为方案,通过轻易云数据集成平台的实际操作经验,详细解析实现这一目标的技术要点与方法。
为了确保每一条来自聚水潭·奇门的数据都能正确无误地写入到MySQL,我们主要关注以下几个关键环节:
- 调用聚水潭·奇门接口获取数据:通过
jushuitan.refund.list.query
API接口定时抓取售后订单列表,这是整个流程的第一步。利用轻易云提供的可视化配置界面,可以方便地设置API调用参数,并处理分页和限流问题,确保全量数据不遗漏。 - 自定义数据转换逻辑:返回的数据可能格式多样且包含冗余信息,需要经过清洗和转换,以便符合MySQL表结构。这里我们利用平台支持的数据转换功能,自定义映射字段,实现精准转换。
- 快速批量写入到MySQL:为了应对大规模数据高吞吐量需求,我们采用了多线程批量提交机制,通过
execute
API接口批量插入,提高写入效率并降低数据库压力。同时,还设置了高效可靠的错误重试机制,以处理临时性网络或数据库故障。 - 实时监控与告警:实施过程中,全程启用集中监控和告警系统,对任务运行状态进行实时监控。一旦出现异常情况,例如接口响应超时或写入失败等,将立即触发告警通知相关负责人员,保障整合过程顺利进行。
通过上述步骤及相应技术手段,我们成功搭建了一套稳定、高效、安全的数据集成方案。本案例不仅提升了业务透明度,也显著提高了运营效率,在此基础上,我们还进行了若干优化,例如...
(待续)
调用聚水潭·奇门接口获取并加工数据
在数据集成生命周期的第一步,我们需要调用聚水潭·奇门接口jushuitan.refund.list.query
来获取售后单数据,并进行必要的数据加工。以下是具体的技术实现过程。
接口调用配置
首先,我们需要配置调用聚水潭·奇门接口的元数据。根据提供的元数据配置,接口请求采用POST方法,主要参数包括页码、页数、修改起始时间、修改结束时间等。以下是具体的请求参数配置:
{
"api": "jushuitan.refund.list.query",
"effect": "QUERY",
"method": "POST",
"number": "as_id",
"id": "{as_id}{modified}",
"name": "as_id",
"idCheck": true,
"request": [
{"field":"page_index","label":"页码","type":"string","describe":"页码","value":"1"},
{"field":"page_size","label":"页数","type":"string","describe":"页数","value":"50"},
{"field":"start_time","label":"修改起始时间","type":"datetime","describe":"开始时间","value":"_function LEFT( '{{DAYS_AGO_1|datetime}}' , 10)"},
{"field":"end_time","label":"修改结束时间","type":"datetime","describe":"结束时间","value":"_function LEFT( '{{CURRENT_TIME|datetime}}' , 10)"},
{"field":"so_ids","label":"线上单号列表","type":"string","describe":"线上单号列表"},
{"field":"date_type","label":"时间类型","type":"string","describe":"时间类型"},
{"field":"status","label":"售后单状态","type":"string","describe":"售后单状态"},
{"field":"good_status","label":"货物状态","type":"string", "describe": "BUYER_NOT_RECEIVED:买家未收到货,BUYER_RECEIVED:买家已收到货,BUYER_RETURNED_GOODS:买家已退货,SELLER_RECEIVED:卖家已收到退货"},
{"field": "type", "label": "售后类型", "type": "string", "describe": "普通退货,其它,拒收退货,仅退款,投诉,补发,换货,维修"}
],
"autoFillResponse": true,
"condition_bk":[
[{"field": "order_status", "logic": "eqv2", "value": "Sent"},
{"field": "shop_status", "logic": "eqv2", "value": "SUCCESS"}]
],
"beatFlat":["items"]
}
数据请求与清洗
在调用接口获取数据之后,需要对返回的数据进行清洗和预处理。这一步骤确保了数据的一致性和完整性,为后续的数据转换与写入打下基础。
- 分页处理:由于每次请求返回的数据量有限(默认每页50条),需要通过循环分页获取所有符合条件的数据。
- 字段筛选与转换:根据业务需求,对返回的数据字段进行筛选和转换。例如,将日期格式统一为标准格式,将状态码转换为易于理解的文本描述等。
- 去重处理:由于可能存在重复记录,需要对数据进行去重处理。可以使用
as_id
和modified
字段组合作为唯一标识符来判断记录是否重复。
以下是一个示例代码片段,用于实现分页请求和数据清洗:
import requests
import json
def fetch_refund_data(page_index=1, page_size=50):
url = 'https://api.jushuitan.com/refund/list/query'
headers = {'Content-Type': 'application/json'}
payload = {
'page_index': str(page_index),
'page_size': str(page_size),
'start_time': '2023-01-01',
'end_time': '2023-12-31'
# 添加其他必要参数
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
data = response.json()
return data['items']
else:
return []
# 分页获取所有数据
all_data = []
page_index = 1
while True:
data = fetch_refund_data(page_index)
if not data:
break
all_data.extend(data)
page_index += 1
# 数据清洗与去重
unique_data = {f"{item['as_id']}{item['modified']}": item for item in all_data}.values()
# 后续处理...
数据转换与写入
在完成数据清洗之后,需要将数据转换为目标系统所需的格式,并写入到BI崛起平台的售后表【历史订单】中。这一步骤通常涉及字段映射、格式转换以及批量写入操作。
例如,可以使用Python脚本或ETL工具将清洗后的数据批量插入到目标数据库中:
import pandas as pd
from sqlalchemy import create_engine
# 转换为DataFrame
df = pd.DataFrame(unique_data)
# 数据库连接配置
engine = create_engine('mysql+pymysql://user:password@host/dbname')
# 写入数据库
df.to_sql('bi_rise_refund_history', con=engine, if_exists='append', index=False)
通过上述步骤,我们完成了从聚水潭·奇门接口获取售后单数据并进行加工处理的全过程。这不仅确保了数据的一致性和完整性,还为后续的数据分析和业务决策提供了可靠的数据基础。
使用轻易云数据集成平台进行ETL转换并写入MySQL API接口
在数据集成过程中,将源平台的数据转换为目标平台可接受的格式是关键步骤之一。本文将详细探讨如何使用轻易云数据集成平台将聚水潭的售后单数据转换为BI崛起售后表的历史订单,并通过MySQL API接口写入目标平台。
配置元数据
首先,我们需要根据提供的元数据配置,定义好要转换的数据字段。以下是元数据配置的具体内容:
{
"api": "execute",
"effect": "EXECUTE",
"method": "SQL",
"number": "id",
"id": "id",
"name": "id",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "主参数",
"type": "object",
"describe": "对应主语句内的动态参数",
"children": [
{"field":"id","label":"主键","type":"string","value":"{as_id}-{items_asi_id}-{modified}"},
{"field":"as_id","label":"售后单号","type":"string","value":"{as_id}"},
{"field":"as_date","label":"申请时间","type":"string","value":"{as_date}"},
// ...(省略其他字段)
{"field":"buyer_apply_refund","label":"线上申请金额","type":"string","value":"{buyer_apply_refund}"}
]
}
],
"otherRequest": [
{
"field": "main_sql",
"label": "主语句",
"type": "string",
"describe": "SQL首次执行的语句,将会返回:lastInsertId",
"value":
`INSERT INTO refund_list_query(
id, as_id, as_date, outer_as_id, so_id, type, modified, status, remark, question_type, warehouse,
refund, payment, good_status, shop_buyer_id, shop_id, logistics_company, l_id, o_id, order_status,
drp_co_id_to, wh_id, drp_co_id_from, node, wms_co_id, shop_status, freight, labels,
refund_version,sns_sku_id,sns_sn,order_type,confirm_date,
items_outer_oi_id,items_receive_date,items_i_id,
items_combine_sku_id,items_asi_id,
items_sku_id,items_qty,
items_price,
items_amount,
items_name,
items_type,
items_properties_value,
items_r_qty,
items_sku_type,
items_shop_sku_id,
items_defective_qty,
items_shop_amount,
items_remark,
created,
ts,
shop_name,
order_label,
free_amount,
creator_name,buyer_receive_refund,buyer_apply_refund
) VALUES (
:id,:as_id,:as_date,:outer_as_id,:so_id,:type,:modified,:status,:remark,:question_type,:warehouse,:refund,:payment,:good_status,:shop_buyer_id,:shop_id,:logistics_company,:l_id,:o_id,:order_status,:drp_co_id_to,:wh_id,:drp_co_id_from,:node,:wms_co_id,:shop_status,:freight,:labels,:refund_version,sns_sku_id,sns_sn ,order_type ,confirm_date ,items_outer_oi_id ,items_receive_date ,items_i_id ,items_combine_sku_id ,items_asi_id ,items_sku _id ,items_qty ,items_price ,items_amount ,items_name ,items_type ,items_properties_value ,items_r_qty ,items_sku_type ,items_shop_sku _id ,items_defective_qty ,items_shop_amount ,items_remark ,created ,ts ,shop_name ,
order_label ,
free_amount ,
creator_name ,
buyer_receive_refund ,
buyer_apply_refund
);`
}
]
}
数据请求与清洗
在ETL过程的第一步,我们需要从源系统请求数据并进行必要的清洗。这一步主要涉及到对原始数据进行格式化、校验和标准化,以确保数据的一致性和完整性。例如:
SELECT
as.id AS as_id,
as.date AS as_date,
as.outer_as_no AS outer_as_no
FROM
after_sales as
WHERE
as.status = 'Confirmed';
在这个过程中,可以利用轻易云提供的可视化界面来简化操作,确保每个字段都能正确映射到目标数据库中的相应字段。
数据转换与写入
接下来,我们需要将清洗后的数据进行转换,使其符合目标平台MySQL API接口所能接收的格式。以下是具体的SQL插入语句示例:
INSERT INTO refund_list_query(
id, as_id, as_date, outer_as_no
) VALUES (
:id_param1-:item_param2-:modified_param3,:as_no_param4,:date_param5,:outer_no_param6
);
在这一步中,需要特别注意字段类型和长度匹配问题,以防止因不匹配导致的数据写入失败。
动态参数配置
为了实现灵活的数据写入,我们可以使用动态参数配置。在上述元数据配置中,main_params
字段定义了所有需要动态传递的参数。通过这种方式,可以确保每次执行插入操作时,都能够根据实际情况传递不同的参数值。
例如:
{
"field": ":id_param1-:item_param2-:modified_param3",
...
}
这样可以保证每条记录都有一个唯一标识符,从而避免重复插入或覆盖现象。
执行SQL语句
最后一步是执行生成好的SQL语句,将转换后的数据写入目标数据库。轻易云平台提供了多种方式来执行这些语句,包括同步和异步方式。在实际应用中,可以根据业务需求选择合适的方法。
通过上述步骤,我们成功地将聚水潭售后单的数据转换并写入到BI崛起售后表中,实现了不同系统间的数据无缝对接。这不仅提升了业务效率,还确保了数据的一致性和完整性。