吉客云数据集成到MySQL的技术案例分享
在本次技术案例中,我们将重点探讨如何使用轻易云数据集成平台,将吉客云的其他入库单查询结果高效、可靠地集成到MySQL数据库。集成方案名称为“吉客云-其他入库单查询-->BI拉伯塔-其他入库单表”。该方案旨在通过API接口erp.storage.goodsdocin.v2
获取吉客云的数据,并利用MySQL API batchexecute
进行批量写入,实现大量数据的快速存储和处理。
为了确保整个数据对接过程顺利进行,本方案特别关注以下几个关键点:
-
定时抓取与分页处理:通过设置定时任务,定期调用吉客云接口以抓取最新的其他入库单数据。同时,为了应对可能存在的大量记录,我们进行了分页和限流操作,以确保每次请求的数据规模可控,并避免超出系统负荷。
-
自定义转换逻辑与格式差异解决:针对吉客云返回的数据结构,与目标MySQL数据库字段不完全匹配的问题,通过自定义转换逻辑,对原始数据进行清洗和映射,保证写入过程中的一致性和完整性。
-
高吞吐量写入能力:轻易云平台提供了强大的批量处理功能,使得我们能有效快速地将大量记录一次性写入到MySQL。这对于提升系统性能和响应速度至关重要,同时也减少系统资源消耗。
-
实时监控与异常处理机制:采用集中化监控工具来实时跟踪每个集成任务的执行状态,一旦检测到错误或异常情况,触发告警并启动自动重试机制,最大限度地保障数据传输不中断、不漏单。
-
日志记录与审计:实现全流程日志记录,从API调用开始,到最终写入完成,每一步都有详细的日志信息供查阅。这不仅有助于问题排查,也为后续优化提供了宝贵的数据支持。
以上是此次技术案例开头部分主要内容。在接下来的讨论中,将逐步展开具体实施步骤及代码示例,包括如何正确配置API调用、实现分页抓取策略、自定义转化函数,以及优化批量插入性能的方法。
调用吉客云接口erp.storage.goodsdocin.v2获取并加工数据的技术案例
在数据集成过程中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用吉客云接口erp.storage.goodsdocin.v2
来获取并加工数据。
接口配置与请求参数
首先,我们需要配置接口的元数据。根据提供的元数据配置,我们可以看到该接口采用POST方法,主要参数包括分页信息、入库单号、创建时间、更新时间、入库类型等。以下是具体的请求参数配置:
{
"api": "erp.storage.goodsdocin.v2",
"method": "POST",
"number": "goodsdocNo",
"id": "recId",
"pagination": {
"pageSize": 10
},
"request": [
{"field": "pageIndex", "label": "分页页码", "type": "int"},
{"field": "pageSize", "label": "分页页数", "type": "int", "value": "100"},
{"field": "goodsDocNo", "label": "入库单号", "type": "string"},
{"field": "startDate", "label": "创建时间的起始时间", "type": "string"},
{"field": "endDate", "label": "创建时间的结束时间", "type": "string"},
{"field": "gmtModifiedStart",
"label": "主表更新时间起始",
"type":"string",
"value":"_function from_unixtime(({LAST_SYNC_TIME}-86400),'%Y-%m-%d %H:%i:%s')"
},
{"field":"gmtModifiedEnd",
"label":"主表更新时间截至",
"type":"string",
"value":"_function from_unixtime(({CURRENT_TIME}),'%Y-%m-%d %H:%i:%s')"
},
{"field":"inouttype","label":"入库类型","type":"string","value":"104"},
{"field":"sourceBillNo","label":"原始单号","type":"string"},
{"field":"warehouseCode","label":"仓库编号","type":"string"},
{"field":"outBillNo","label":"外部单号","type":"string"},
{"field":"vendCode","label":"供应商编号(往来单位)","type":"string"},
{"field":"billNo","label":"上游单据号(关联单号)","type":"string"},
{"field":"userName","label":"创建人名称","type":"string"},
{
"field" : "selelctFields",
"label" : "需要返回的字段",
"type" : "string",
"value" : "recId,goodsdocNo,billNo,inOutDate,gmtCreate,inouttype,inouttypeName,vendCustomerCode,vendCustomerName,currencyCode,currencyRate,userName,warehouseCode,warehouseName,comment,memo,logisticName,logisticNo,companyId,companyName,logisticType,logisticCode,inOutReason,sourceBillNo,channelId,channelCode,channelName,redStatus,field1,field2,field3,field4,field5..."
}
],
...
}
数据请求与清洗
在实际操作中,我们会先构建一个HTTP POST请求,包含上述所有必要参数。为了保证数据的一致性和准确性,我们需要特别注意以下几点:
- 分页处理:由于数据量可能较大,需要进行分页处理。每次请求时设置
pageIndex
和pageSize
,确保能够逐页获取所有数据。 - 时间过滤:通过
gmtModifiedStart
和gmtModifiedEnd
字段,可以实现对更新时间范围内的数据进行过滤。这两个字段使用了函数转换,将时间戳转换为标准时间格式。 - 入库类型:固定值为104,即“其他入库”,确保只获取特定类型的入库单。
示例代码如下:
import requests
import json
url = 'https://api.jikecloud.com/erp.storage.goodsdocin.v2'
headers = {'Content-Type': 'application/json'}
payload = {
'pageIndex': 1,
'pageSize': 100,
'gmtModifiedStart': '2023-01-01 00:00:00',
'gmtModifiedEnd': '2023-01-31 23:59:59',
'inouttype': '104',
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
data = response.json()
数据转换与写入
获取到原始数据后,需要对其进行清洗和转换,以便写入目标系统。在这个过程中,可以利用轻易云平台提供的数据处理工具,对字段进行映射、格式转换等操作。
例如,将返回的数据中的日期字段从字符串格式转换为日期对象,或者将某些字段值进行标准化处理:
from datetime import datetime
def clean_data(raw_data):
for record in raw_data:
record['inOutDate'] = datetime.strptime(record['inOutDate'], '%Y-%m-%d %H:%M:%S')
# 更多清洗操作...
cleaned_data = clean_data(data['result'])
最后,将清洗后的数据写入目标系统,例如BI拉伯塔中的其他入库单表:
def write_to_target_system(cleaned_data):
# 假设目标系统有一个API可以接收批量插入的数据
target_url = 'https://bi.labata.com/api/v1/storage'
response = requests.post(target_url, headers=headers, data=json.dumps(cleaned_data))
write_to_target_system(cleaned_data)
通过以上步骤,我们完成了从吉客云接口获取并加工数据的全过程。在实际项目中,可以根据具体需求调整参数和处理逻辑,以达到最佳效果。
数据集成生命周期中的ETL转换与写入
在数据集成的过程中,将源平台的数据转换为目标平台所能接受的格式是至关重要的一步。本文将详细探讨如何利用轻易云数据集成平台将吉客云的“其他入库单查询”数据转换并写入到BI拉伯塔的MySQL数据库中。
数据请求与清洗
在进行ETL转换之前,首先需要从源系统吉客云中获取“其他入库单查询”的数据。这一步骤涉及到数据请求和清洗,确保获取的数据符合目标平台的需求。
数据转换与写入
接下来,我们重点关注如何将清洗后的数据进行转换,并通过MySQL API接口写入到目标平台。以下是元数据配置及其应用示例:
{
"api": "batchexecute",
"effect": "EXECUTE",
"method": "SQL",
"number": "id",
"id": "id",
"name": "id",
"idCheck": true,
"request": [
{"field":"recId","label":"入库单ID","type":"string","value":"{recId}"},
{"field":"goodsdocNo","label":"入库单号","type":"string","value":"{goodsdocNo}"},
{"field":"billNo","label":"上游单据号(关联单号)","type":"string","value":"{billNo}"},
{"field":"inOutDate","label":"入库时间","type":"string","value":"{{inOutDate|datetime}}"},
{"field":"gmtCreate","label":"创建时间","type":"string","value":"{{gmtCreate|datetime}}"},
{"field":"inouttype","label":"入库类型编码(100-期初库存 101-采购入库...)","type":"string","value":"{inouttype}"},
{"field":"inouttypeName","label":"入库类型名称(100-期初库存 101-采购入库...)","type":"string","value":"{inouttypeName}"},
{"field":"vendCustomerCode","label":"往来单位编号","type":"string","value":"{vendCustomerCode}"},
{"field":"vendCustomerName","label":"往来单位名称","type":"string","value":"{vendCustomerName}"},
{"field":"currencyCode","label":"币种编号","type":"string","value":"{currencyCode}"},
{"field":"currencyRate","label":"币种汇率","type":"string","value":"{currencyRate}"},
{"field":"userName","label":"业务员名字","type":"string","value":"{userName}"},
{"field":"","label":"","type":"","value":""}
],
"otherRequest": [
{
"field": "main_sql",
"label": "主语句",
"type": "string",
"describe": "SQL首次执行的语句,将会返回:lastInsertId",
"value":
`REPLACE INTO erp_other_storage_goodsdocin
(recId, goodsdocNo, billNo, inoutDate, gmtCreate, inouttype, inouttypeName, vendCustomerCode, vendCustomerName, currencyCode, currencyRate, userName)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
},
{
"field": "limit",
"label": "limit",
"type": "string",
"value": "1000"
}
]
}
SQL语句解析
上述配置中的main_sql
字段定义了一个SQL插入语句,用于将数据写入MySQL数据库。该语句使用了占位符?
,这些占位符将在实际执行时被具体的数据值替换。
例如:
REPLACE INTO erp_other_storage_goodsdocin
(recId, goodsdocNo, billNo, inoutDate, gmtCreate, inouttype, inouttypeName, vendCustomerCode, vendCustomerName, currencyCode, currencyRate, userName)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
在执行时,系统会自动将占位符替换为相应的数据值,如:
REPLACE INTO erp_other_storage_goodsdocin
(recId, goodsdocNo, billNo, inoutDate, gmtCreate, inouttype, inouttypeName, vendCustomerCode,vendCustomerName,currencyCode,currencyRate,userName)
VALUES ('12345', 'IN20230901', 'UP20230901', '2023-09-01 10:00:00', '2023-09-01 09:00:00', '101', '采购入库', 'CUST001', '客户A', 'CNY', '6.5', '张三')
数据类型转换
在元数据配置中,字段inOutDate
和gmtCreate
等日期字段需要进行格式转换。通过使用模板字符串如{{inOutDate|datetime}}
,可以确保日期格式符合目标平台要求。
批量执行
为了提高效率,可以使用批量执行API (batchexecute
) 一次性处理多条记录。通过设置合适的批处理大小(如配置中的limit:1000
),可以有效减少网络延迟和资源消耗。
异常处理
在实际操作中,需要考虑异常处理机制。例如,当某条记录插入失败时,应记录错误信息并继续处理后续记录,以确保整体流程的稳定性和可靠性。
总结
通过上述步骤,我们实现了从吉客云到BI拉伯塔MySQL数据库的数据ETL转换和写入。在此过程中,充分利用了轻易云数据集成平台提供的元数据配置功能,实现了高效、可靠的数据集成。