案例分享:钉钉数据集成到MySQL
在当今的数据密集型业务环境中,高效可靠的数据集成已成为企业运作的核心环节。本文将深入探讨如何通过一个具体实例,将钉钉新转账单(银行转账)的数据无缝对接到MySQL数据库中,方案名称为:dd-钉钉新转账单(银行转账)-->mysql (鸿巢)。
这个案例利用了轻易云提供的高吞吐量数据写入能力和自定义数据转换逻辑,使得大量来自钉钉接口v1.0/yida/processes/instances的数据能够被快速而准确地写入到MySQL数据库,通过调用execute API完成最终存储。同时,集中监控和告警系统确保了整个过程的实时跟踪和问题预警,从而大幅提升任务执行的可靠性。
针对不同系统间可能存在的数据结构差异,我们采用可视化的数据流设计工具进行直观管理,并且定制了特殊映射规则以满足特定业务需求。此外,为处理分页与限流等API接口常见挑战以及错误重试机制的实现,我们设计了一套完备的解决方案,以确保每一条关键数据都能成功获取并保存,不漏单。
后续部分将详细介绍从配置元数据、调用API获取信息,到高效写入MySQL各个步骤中的技术细节。
调用钉钉接口获取并加工数据
在数据集成过程中,调用源系统接口是至关重要的一步。本文将深入探讨如何通过轻易云数据集成平台调用钉钉接口v1.0/yida/processes/instances
获取数据,并对其进行加工处理。
API接口配置
首先,我们需要配置API接口的元数据。以下是具体的元数据配置:
{
"api": "v1.0/yida/processes/instances",
"method": "POST",
"number": "title",
"id": "processInstanceId",
"idCheck": true,
"formatResponse": [
{"old": "dateField_lglvrpp4", "new": "datetime_new", "format": "date"},
{"old": "serialNumberField_lgov9d3b", "new": "order_no_new", "format": "string"}
],
"request": [
{"field": "pageNumber", "label": "分页页码", "type": "string", "describe": "分页页码", "value": "{PAGINATION_START_PAGE}"},
{"field": "pageSize", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "{PAGINATION_PAGE_SIZE}"},
{"field": "appType", "label": "应用ID", "type": "string", "describe": "应用ID",
"value":"APP_WTSCMZ1WOOHGIM5N28BQ"},
{"field":"systemToken","label":"应用秘钥","type":"string","describe":"应用秘钥","value":"IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4"},
{"field":"userId","label":"用户的userid","type":"string","describe":"用户的userid","value":"16000443318138909"},
{"field":"language","label":"语言","type":"string","describe":"语言,取值:zh_CN:中文(默认值)en_US:英文"},
{"field":"formUuid","label":"表单ID","type":"string","describe":"表单ID","value":"FORM-KW766OD1931ALBVJBHUHLD6KFJ3G3OELLVLGL0"},
{"field":"searchFieldJson","label":"条件","type":"object","children":[{"field":"selectField_lglvrpmg","label":"类型","type":"string","value":"组织内部转款"}]},
{"field":"originatorId","label":"根据流程发起人工号查询","type":"string","describe":"根据流程发起人工号查询"},
{"field":"createFromTimeGMT","label":"创建时间起始值","type":"string","describe":"创建时间起始值",
"value":"'DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),'%Y-%m-%d 00:00:00')'"},
{"field":"createToTimeGMT","label":"创建时间终止值","type":"string","describe":"创建时间终止值",
"value":"'{{CURRENT_TIME|datetime}}'"},
{"field":"modifiedFromTimeGMT","label":"修改时间起始值","type":"string","describe":"修改时间起始值"},
{"field":"modifiedToTimeGMT","label":"修改时间终止值","type":"string","describe":"修改时间终止值"},
{"field':'taskId','label':'任务ID','type':'string','describe':'任务ID'},
{'field':'instanceStatus','label':'实例状态','type':'string','describe':'实例状态','value':'COMPLETED'},
{'field':'approvedResult','label':'流程审批结果','type':'string','describe':'流程审批结果','value':'agree'}
],
'condition':[[{'field':'dateField_lglvrpp4','logic':'notnull'}]]
}
请求参数解析
在请求参数中,我们需要特别注意以下几个字段:
pageNumber
和pageSize
用于分页控制,确保我们能够逐页获取数据。appType
和systemToken
是应用的身份验证信息,必须正确填写。userId
是调用API的用户标识。formUuid
是表单的唯一标识符。searchFieldJson
用于指定查询条件,例如这里设置了类型为“组织内部转款”。- 时间相关字段如
createFromTimeGMT
和createToTimeGMT
用于限定查询的数据范围。
数据格式化与转换
在获取到原始数据后,我们需要对其进行格式化和转换。元数据配置中的 formatResponse
字段定义了具体的转换规则:
- 将字段
dateField_lglvrpp4
转换为新的字段名datetime_new
,并将其格式化为日期类型。 - 将字段
serialNumberField_lgov9d3b
转换为新的字段名order_no_new
,并将其格式化为字符串类型。
这些转换规则确保了数据在写入目标系统时符合预期格式。
数据请求与清洗
在实际操作中,我们通过发送POST请求来调用钉钉API,并获取返回的数据。示例如下:
import requests
import json
url = 'https://oapi.dingtalk.com/v1.0/yida/processes/instances'
headers = {'Content-Type': 'application/json'}
payload = {
'pageNumber': '1',
'pageSize': '10',
'appType': 'APP_WTSCMZ1WOOHGIM5N28BQ',
'systemToken': 'IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4',
'userId': '16000443318138909',
'language': 'zh_CN',
'formUuid': 'FORM-KW766OD1931ALBVJBHUHLD6KFJ3G3OELLVLGL0',
'searchFieldJson': {'selectField_lglvrpmg': '组织内部转款'},
'createFromTimeGMT': "'DATE_FORMAT(DATE_ADD(NOW(),INTERVAL -25 DAY),'%Y-%m-%d 00:00:00')'",
'createToTimeGMT': "'{{CURRENT_TIME|datetime}}'",
'instanceStatus': 'COMPLETED',
'approvedResult': 'agree'
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
data = response.json()
数据清洗与写入
获取到的数据需要经过清洗和转换,然后写入目标数据库(如MySQL)。以下是一个简单的数据清洗示例:
def clean_data(raw_data):
cleaned_data = []
for item in raw_data:
cleaned_item = {
'datetime_new': item['dateField_lglvrpp4'],
'order_no_new': item['serialNumberField_lgov9d3b']
}
cleaned_data.append(cleaned_item)
return cleaned_data
最终,将清洗后的数据写入MySQL数据库:
import pymysql
def write_to_mysql(cleaned_data):
connection = pymysql.connect(
host='your_mysql_host',
user='your_mysql_user',
password='your_mysql_password',
database='your_database'
)
cursor = connection.cursor()
for item in cleaned_data:
sql = """
INSERT INTO your_table (datetime_new, order_no_new)
VALUES (%s, %s)
"""
cursor.execute(sql, (item['datetime_new'], item['order_no_new']))
connection.commit()
cursor.close()
connection.close()
# Example usage
raw_data = data['data'] # Assuming the API response contains a key named data
cleaned_data = clean_data(raw_data)
write_to_mysql(cleaned_data)
通过上述步骤,我们实现了从钉钉接口获取数据、进行清洗和转换,并最终写入MySQL数据库的全过程。这一过程展示了如何利用轻易云数据集成平台高效地进行异构系统间的数据集成。
数据集成生命周期中的ETL转换与写入
在数据集成的过程中,ETL(Extract, Transform, Load)是一个至关重要的步骤。本文将深入探讨如何利用轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,并最终写入目标平台MySQL。
元数据配置解析
在本次案例中,我们将处理钉钉新转账单(银行转账)的数据,并将其转换为MySQL API接口所能接收的格式。以下是元数据配置的详细解析:
{
"api": "execute",
"effect": "EXECUTE",
"method": "POST",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "main_params",
"type": "object",
"describe": "111",
"children": [
{"field": "extend_processInstanceId", "label": "明细id", "type": "string", "value":"{{extend.processInstanceId}}"},
{"field": "order_no_new", "label": "单号", "type": "string", "value":"{order_no_new}(YHZZ)"},
{"field": "datetime_new", "label": "时间", "type": "date", "value":"{datetime_new}"},
{"field": "qty_count", "label": "数量", "type":"string","value":"1"},
{"field":"sales_count","label":"金额","type":"string","value":"{numberField_lglvrpn8}"},
{"field":"status","label":"状态","type":"string"},
{"field":"Document_Type","label":"单据类型","type":"string","value":"银行转账"}
]
}
],
...
}
数据请求与清洗
在这个阶段,我们从钉钉新转账单中获取原始数据。通过元数据配置中的request
部分,可以看到我们需要提取多个字段,包括extend_processInstanceId
、order_no_new
、datetime_new
、qty_count
、sales_count
、status
和Document_Type
。
这些字段的数据类型各异,有字符串、日期和对象类型。在提取过程中,需要确保数据的完整性和准确性。例如,字段order_no_new
需要附加后缀“(YHZZ)”,而字段qty_count
则固定为“1”。
数据转换
在数据清洗完成后,下一步是将这些数据转换为目标平台MySQL所能接受的格式。这一步主要通过SQL语句来实现。在元数据配置中,提供了一个关键字段:
{
...
,"otherRequest":[
{
...
,"value":"INSERT INTO `hc_dd_yhzz`( `extend_processInstanceId`,`order_no_new`,`datetime_new`,`qty_count`,`sales_count`,`status`,`Document_Type`) VALUES (:extend_processInstanceId,:order_no_new,:datetime_new,:qty_count,:sales_count,:status,:Document_Type)"
}
]
}
该SQL语句定义了如何将清洗后的数据插入到MySQL数据库中的表格hc_dd_yhzz
。其中,各个占位符如:extend_processInstanceId
, :order_no_new
, :datetime_new
, :qty_count
, :sales_count
, :status
, 和:Document_Type
会被实际的数据值替换。
数据写入
最后一步是执行上述SQL语句,将转换后的数据写入到目标平台MySQL中。这一步通过API调用来实现,具体方法为POST请求:
{
...
,"method":"POST"
}
在执行过程中,系统会检查ID是否存在(由参数"idCheck"控制),并根据配置好的SQL语句,将数据插入到指定的表格中。
总结
通过上述步骤,我们实现了从钉钉新转账单到MySQL数据库的数据集成。在这个过程中,轻易云数据集成平台提供了全生命周期管理和可视化操作界面,使得每个环节都透明且高效。通过合理配置元数据,我们能够灵活地进行ETL转换,并确保最终的数据写入符合目标平台的要求。