测试-查询销售渠道信息(已删除数据)-dange 集成案例解析
在此次技术案例中,我们将详细解析如何通过轻易云数据集成平台,将吉客云的数据高效、可靠地集成到MySQL数据库中。本次方案旨在实现对吉客云销售渠道信息,包括已删除数据的定时抓取,并批量写入到MySQL中,方案名称为“测试-查询销售渠道信息(已删除数据)-dange”。
注意事项与挑战
对于此次集成任务,我们主要面临以下几个技术挑战:
- 大量数据快速写入:确保从吉客云获取的大量销售渠道信息能够高吞吐量地写入到MySQL,实现数据处理高效性。
- 实时监控和异常处理:全面跟踪每一个数据集成任务的状态与性能,同时提供有效的错误重试机制,以应对网络波动或接口调用失败等情况。
- 分页与限流问题:由于API接口返回的数据量较大,需要合理设计分页逻辑,并解决限流带来的潜在影响。
方案概要
本次实施集中使用了如下特性:
- 使用
erp.sales.get
API 从吉客云定时获取销售信息,并包含已删除的数据记录,保障不漏单。 - 利用轻易云提供的可视化流程设计工具,实现自定义的数据转换逻辑,使得不同格式之间的数据顺利映射至目标表结构。
- 借助中心化监控告警系统,对整个集成过程进行实时监控,及时发现并处理异常情况,通过日志记录功能追踪每一步操作。
接下来我们将深入剖析此集成案例,从具体API调用、分页策略、到实际写入细节,以及如何使用轻易云平台配置过程中的关键步骤。
调用吉客云接口erp.sales.get获取并加工数据
在数据集成的生命周期中,第一步是调用源系统接口以获取原始数据。在本案例中,我们将重点讨论如何通过轻易云数据集成平台调用吉客云的erp.sales.get
接口,并对获取的数据进行初步加工。
元数据配置解析
首先,我们需要理解元数据配置中的各个字段及其作用。以下是对元数据配置的详细解析:
{
"api": "erp.sales.get",
"effect": "QUERY",
"method": "POST",
"number": "channelCode",
"id": "channelId",
"request": [
{"field": "pageIndex", "label": "页码(默认0)", "type": "int"},
{"field": "pageSize", "label": "每页页数(默认50)", "type": "int", "value": "50"},
{"field": "code", "label": "编号", "type": "string"},
{"field": "name", "label": "名称", "type": "string"},
{"field": "gmtModifiedStart", "label": "起始修改时间", "type": "string",
"value":"{{LAST_SYNC_TIME|datetime}}"},
{"field": "gmtModifiedEnd", "label":"结束修改时间","type":"string","value":"{{CURRENT_TIME|datetime}}"},
{"field":"isBlockup","label":"是否停用,1-是,0-否","type":"int","value":"1"},
{"field":"isDelete","label":"是否删除,1-是,0-否","type":"int"}
],
“autoFillResponse”: true
}
请求参数详解
- api:
erp.sales.get
,表示我们要调用的吉客云API接口。 - effect:
QUERY
,表示此操作为查询操作。 - method:
POST
,表示使用HTTP POST方法进行请求。 - number:
channelCode
,用于标识销售渠道的编号字段。 - id:
channelId
,用于标识销售渠道的唯一ID字段。
请求参数列表:
- pageIndex:页码,默认为0。
- pageSize:每页记录数,默认为50。
- code:销售渠道编号,可选参数。
- name:销售渠道名称,可选参数。
- gmtModifiedStart:起始修改时间,使用模板变量
{{LAST_SYNC_TIME|datetime}}
自动填充上次同步时间。 - gmtModifiedEnd:结束修改时间,使用模板变量
{{CURRENT_TIME|datetime}}
自动填充当前时间。 - isBlockup:是否停用,固定值为1(是)。
- isDelete:是否删除,可选参数。
实际调用与数据处理
在实际操作中,我们需要按照上述配置构建请求,并发送给吉客云API。以下是一个示例请求体:
{
“pageIndex”: 0,
“pageSize”: 50,
“gmtModifiedStart”: “2023-09-01T00:00:00Z”,
“gmtModifiedEnd”: “2023-09-30T23:59:59Z”,
“isBlockup”: 1
}
通过轻易云平台,我们可以自动生成并发送上述请求。响应结果会包含符合条件的销售渠道信息。由于设置了autoFillResponse=true
,平台会自动处理响应并填充到相应的数据结构中。
数据清洗与转换
获取到原始数据后,需要对其进行清洗和转换,以便后续处理。常见的数据清洗步骤包括:
- 去除无效记录:过滤掉已删除或不符合业务规则的记录。例如,根据字段
isDelete=1
来过滤已删除的数据。 - 字段映射与转换:将源系统中的字段映射到目标系统所需的字段。例如,将吉客云中的
channelCode
映射为目标系统中的salesChannelCode
。
示例代码如下:
def clean_and_transform(data):
cleaned_data = []
for record in data:
if record['isDelete'] == 0:
transformed_record = {
'salesChannelCode': record['channelCode'],
'salesChannelId': record['channelId'],
'name': record['name'],
'modifiedTime': record['gmtModified']
}
cleaned_data.append(transformed_record)
return cleaned_data
通过上述步骤,我们完成了从调用源系统API获取数据,到初步清洗和转换的全过程。这为后续的数据写入和进一步处理奠定了基础。
使用轻易云数据集成平台进行ETL转换并写入MySQL API接口
在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是关键的一步。本文将深入探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。
数据请求与清洗
在数据请求与清洗阶段,我们从源系统获取原始数据,并对其进行初步处理,以确保数据的准确性和一致性。这一步通常包括去重、填补缺失值、标准化字段等操作。假设我们已经完成了这一阶段,接下来将重点放在数据转换与写入阶段。
数据转换与写入
为了将清洗后的数据转为 MySQL API 接口所能接收的格式,我们需要配置相应的元数据。以下是一个详细的元数据配置示例:
{
"api": "execute",
"effect": "EXECUTE",
"method": "POST",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "主参数",
"type": "object",
"describe": "111",
"value": "1",
"children": [
{"field": "id", "label": "销售渠道id", "type": "int", "value": "{channelId}"},
{"field": "channel_code", "label": "渠道编码", "type": "string", "value": "{channelCode}"},
{"field": "channel_name", "label": "销售渠道名称", "type": "string", "value": "{channelName}"},
{"field": "channel_type",
"label":
"渠道类型",
"type":
"string",
"describe":
"渠道类型(0:分销办公室; 1:直营网店;2:直营门店;3:销售办公室;4:货主虚拟店;5:分销虚拟店;6:加盟门店;7:内部交易渠道)",
"value":
"{channelType}"
},
{"field":"plat_code","label":"店铺平台编码","type":"string","value":"{onlinePlatTypeCode}"},
{"field":"plat_name","label":"店铺平台名称","type":"string","value":"{onlinePlatTypeName}"},
{"field":"depart_name","label":"负责部门名称","type":"string","value":"{channelDepartName}"},
{"field":"link_man","label":"联系人","type":"string","value":"{linkMan}"},
{"field":"link_tel","label":"联系电话","type":"string","value":"{linkTel}"},
{"field":"office_address","label":"办公地址","type":"string","value":"{officeAddress}"},
{"field":"company_name","label":"公司名称","type":"string","value":"{companyName}"},
{"field":"country_name","label":"国家","type":"string","value":"{countryName}"},
{"field":"province_name","label":"省","type":"string","value":"{provinceName}"},
{"field":
"city_name",
"label":
"市",
"type":
"string",
"value":
"{cityName}"
},
{"field":
"town_name",
"label":
"镇,区",
"type":
"string",
"value":
"{townName}"
},
{"field":
"street_name",
"label":
"街道",
"type":
"string",
"value":
"{streetName}"
},
{"field":
"memo",
"label":
"备注",
"type":
"string",
"value":
"{memo}"
},
{"field":
"warehouse_code",
"label":
"默认仓库编码",
type:
string,
value:
{warehouseCode}
},
{
field:
warehouse_name,
label:
默认仓库名称,
type:
string,
value:
{warehouseName}
},
{
field:
charge_type,
label:
结算方式,
type:
string,
describe:
结算方式(1:担保交易;2:银行收款;3:现金收款;4:货到付款;5:欠款计应收;6:客户预存款;7:多种结算;8:退换货冲抵;9:电子钱包)
value:
{chargeType}
},
{
field: depart_code, label: 部门编码, type: string, value: {departCode}
},
{
field: company_code, label: 公司编码, type: string, value: {departCode}
},
{
field: is_blockup, label: 是否停用, type: int, describe: 1-是,0-否,value:1,default:"1"
},
{
field:is_delete,label:"是否删除",type:"int",describe:"1-是,0-否"
},
{
field:create_by,label:"创建人",type:"string",value:"系统自动",default:"系统自动"
},
{
field:update_by,label:"修改人",type:"string",value:"系统自动",default:"系统自动"
}
]
}
],
otherRequest:[
{
field: main_sql,label: 主语句,type:string,describe:111,value:`REPLACE INTO lehua.sc_sale_channel(
id,
channel_code,
channel_name,
channel_type,
plat_code,
plat_name,
depart_name,
link_man,
link_tel,
office_address,
company_name,
country_name,
province_Name
city_name
town_name
street_name
memo
warehouse_code
warehouse_name
charge_type
depart_code
company_code
is_blockup
is_delete
create_by
update_by)
VALUES(
<{id}>,
<{channel_code}>,
<{channel_name}>,
<{channel_type}>,
<{plat_code}>,
<{plat_name}>,
<{depart_name}>,
<{link_man}>,
<{link_tel}>,
<{office_address}>,
<{company_name}>,
<{country_name}>
<{province_Name}>
<{city_Name}>
<{town_Name}>
<{street_Name}>
<{memo}>
<{warehouse_Code}>
<{warehouse_Name}>
<charge_Type>
depart_Code>
company_Code>
is_blockup>
is_delete>
create_by>
update_by>);
`
}
]
}
配置解析
-
API配置:
api
: 指定执行操作。effect
: 设置为EXECUTE
表示执行操作。method
: HTTP方法,设置为POST
。
-
主参数配置:
main_params
: 包含所有需要传递的数据字段,每个字段都有详细的描述和类型定义。例如,id
表示销售渠道ID,类型为整数。
-
SQL语句:
main_sql
: 包含实际执行的SQL语句,通过占位符<{...}>
来插入实际的数据值。
数据写入
通过上述配置,我们可以将清洗后的数据转换为MySQL数据库所需的格式,并使用HTTP POST方法将其写入目标数据库。每个字段都经过了严格定义和映射,确保了数据的一致性和完整性。
这种方式不仅提高了数据处理效率,还保证了每个环节的透明度和可追溯性。通过实时监控和日志记录,可以随时检查和调整数据流动状态,确保业务流程顺畅无误。