钉钉数据集成到MySQL:dd-新转账单(资金调拨)-->mysql(鸿巢)技术案例分享
在实施“dd-新转账单(资金调拨)-->mysql(鸿巢)”项目时,面临的主要挑战是如何高效、可靠地将钉钉平台上的数据集成至内部的MySQL数据库。通过轻易云数据集成平台,我们利用其高级特性,实现了从获取钉钉API接口的数据,到转换及入库全过程的精准控制。
首先,在对接过程中我们使用了v1.0/yida/processes/instances
API,从钉钉中抓取相关业务流程实例的数据。这一步骤涉及到了处理分页和限流问题,为确保不漏单,我们制定了一套定时任务机制,能够可靠地周期性抓取并存储最新的数据。同时,通过设置自定义的重试策略,有效降低了由于网络延迟或API速率限制所导致的问题。
对于输送到MySQL数据库中的数据,我们采用了批量写入的方法,以提升整体吞吐效率。借助可视化的数据流设计工具,将多次复杂操作简化为一系列直观且容易管理的步骤。此外,还使用了执行API execute
进行快速、大量的数据写入,这确保了系统在大规模数据交换环境下依然保持稳定运行。
为了实现与业务需求相适应的数据转换逻辑,自定义脚本被用于转换和映射不同格式的数据。在这一过程中,我们特别注意处理可能存在的不一致类型及结构差异,提高最终储存于MySQL中的数据质量。同时,通过集中监控告警系统,对整个集成过程进行了实时跟踪,迅速发现并解决任何异常情况,使得整个项目顺利落地实施。
调用钉钉接口v1.0/yida/processes/instances获取并加工数据
在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用钉钉接口v1.0/yida/processes/instances
,获取并加工数据,以实现高效的数据集成。
接口调用配置
首先,我们需要配置API调用的元数据。以下是具体的配置细节:
{
"api": "v1.0/yida/processes/instances",
"method": "POST",
"number": "title",
"id": "processInstanceId",
"idCheck": true,
"formatResponse": [
{"old": "dateField_lglvrpp4", "new": "datetime_new", "format": "date"},
{"old": "serialNumberField_lgov9d3b", "new": "order_no_new", "format": "string"}
],
"request": [
{"field": "pageNumber", "label": "分页页码", "type": "string", "describe": "分页页码", "value": "{PAGINATION_START_PAGE}"},
{"field": "pageSize", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "{PAGINATION_PAGE_SIZE}"},
{"field": "appType", "label": "应用ID", "type": "string", "describe": "应用ID",
"value":"APP_WTSCMZ1WOOHGIM5N28BQ"},
{"field":"systemToken","label":"应用秘钥","type":"string","describe":"应用秘钥","value":"IS866HB1DXJ8ODN3EXSVD750RBTK2X72R8MELL4"},
{"field":"userId","label":"用户的userid","type":"string","describe":"用户的userid","value":"16000443318138909"},
{"field":"language","label":"语言","type":"string","describe":"语言,取值:zh_CN:中文(默认值)en_US:英文"},
{"field":"formUuid","label":"表单ID","type":"string","describe":"表单ID","value":"FORM-KW766OD1931ALBVJBHUHLD6KFJ3G3OELLVLGL0"},
{"field":"searchFieldJson","label":"条件","type":"object","children":[{"field":"selectField_lglvrpmg","label":"类型","type":"string","value":"跨组织转账"}]},
{"field":"originatorId","label":"根据流程发起人工号查询","type":"string","describe":"根据流程发起人工号查询"},
{"field":"createFromTimeGMT","label":"创建时间起始值","type":"string","describe":"创建时间起始值",
"value":
"_function DATE_FORMAT(DATE_ADD(NOW(),INTERVAL - 25 DAY),'%Y-%m-%d 00:00:00')"},
{"field":
"createToTimeGMT",
"label":
"创建时间终止值",
"type":
"string",
"describe":
"创建时间终止值",
"value":
"{{CURRENT_TIME|datetime}}"
},
{
"field":
"modifiedFromTimeGMT",
"label":
"修改时间起始值",
"type":
"string",
"describe":
"修改时间起始值"
},
{
"field":
"modifiedToTimeGMT",
"label":
"修改时间终止值",
"type":
"string",
"describe":
"修改时间终止值"
},
{
"field":
"taskId",
"label":
"任务ID",
"type":
"string",
"describe":
"任务ID"
},
{
"field":
"instanceStatus",
"label":
"实例状态",
"type":
"strings
,"describe
:
实例状态
,"value
:
COMPLETED"
},
{
"
approvedResult
,"
label
:
流程审批结果
,"type
:
strings
,"describe
:
流程审批结果
,"value
:
agree"
}
],
"
condition":[[{"field
:"dateField_lglvrpp4"
,"logic
:"notnull"}]]
}
请求参数详解
- pageNumber 和 pageSize:用于分页控制,确保每次请求的数据量可控。
- appType 和 systemToken:分别为应用ID和秘钥,用于身份验证。
- userId 和 language:指定用户和语言环境。
- formUuid 和 searchFieldJson:用于指定表单和搜索条件,其中
searchFieldJson
包含了具体的业务逻辑,如类型为“跨组织转账”。 - createFromTimeGMT 和 createToTimeGMT:用于限定数据的创建时间范围,确保只获取特定时间段内的数据。
- instanceStatus 和 approvedResult:用于过滤已完成且审批通过的实例。
数据格式化与转换
在获取到原始数据后,需要对其进行格式化和转换。元数据配置中的formatResponse
字段定义了这一过程:
- 将
dateField_lglvrpp4
转换为datetime_new
,格式为日期。 - 将
serialNumberField_lgov9d3b
转换为order_no_new
,格式为字符串。
这种转换确保了数据在进入目标系统前已经符合预期格式,提高了数据处理效率。
数据清洗与写入
在完成数据请求和初步清洗后,可以利用轻易云平台提供的工具进一步处理这些数据,并将其写入目标系统(如MySQL数据库)。这一过程包括:
- 数据校验:根据元数据中的条件配置(如字段不为空)进行校验。
- 数据转换:利用平台提供的脚本或函数对数据进行进一步转换。
- 数据写入:通过配置好的连接器,将处理后的数据写入目标数据库。
通过上述步骤,我们可以高效地从钉钉系统中获取所需的数据,并将其无缝集成到其他业务系统中,实现跨系统的数据共享与协同工作。
使用轻易云数据集成平台进行ETL转换并写入MySQL API接口
在数据集成过程中,ETL(Extract, Transform, Load)转换是关键步骤之一。本文将深入探讨如何通过轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。
数据请求与清洗
在进行ETL转换之前,首先需要完成数据的请求与清洗。这一步骤确保从源系统获取的数据是准确且符合要求的。假设我们已经完成了这一步骤,现在我们关注如何将清洗后的数据进行转换并写入目标平台。
数据转换与写入
在轻易云数据集成平台中,元数据配置是实现数据转换和写入的重要环节。以下是一个详细的元数据配置示例:
{
"api": "execute",
"effect": "EXECUTE",
"method": "POST",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "main_params",
"type": "object",
"describe": "111",
"children": [
{
"field": "extend_processInstanceId",
"label": "明细id",
"type": "string",
"value": "{{extend.processInstanceId}}"
},
{
"field": "order_no_new",
"label": "单号",
"type": "string",
"value": "{order_no_new}(ZJDB)"
},
{
"field": "datetime_new",
"label": "时间",
"type": "date",
"value": "{datetime_new}"
},
{
"field": "qty_count",
"label": "数量",
"type": "string",
"value":"1"
},
{
"field":"sales_count",
"label":"金额",
"type":"string",
"value":"{numberField_lglvrpn8}"
},
{
“field”:”status”,
“label”:”状态”,
“type”:”string”
},
{
“field”:”Document_Type”,
“label”:”单据类型”,
“type”:”string”,
“value”:”资金调拨”
}
]
}
],
"otherRequest":[
{
“field”:”main_sql”,
“label”:”main_sql”,
“type”:”string”,
“describe”:”111”,
“value”:”
INSERT INTO `hc_dd_zjdb` (
`extend_processInstanceId`,
`order_no_new`,
`datetime_new`,
`qty_count`,
`sales_count`,
`status`,
`Document_Type`
) VALUES (
:extend_processInstanceId,
:order_no_new,
:datetime_new,
:qty_count,
:sales_count,
:status,
:Document_Type
)
”
}
]
}
元数据配置解析
-
API接口定义
"api":"execute"
:指定要执行的操作。"effect":"EXECUTE"
:定义操作效果为执行。"method":"POST"
:使用HTTP POST方法。
-
请求参数
"request"
部分定义了请求参数,包括字段名称、标签、类型和描述等信息。- 每个字段都包含具体的值来源,例如:
"extend_processInstanceId"
:从{{extend.processInstanceId}}
获取值。"order_no_new"
:从{order_no_new}(ZJDB)
获取值并添加后缀。"datetime_new"
:从{datetime_new}
获取日期值。"qty_count"
:固定值为1
。"sales_count"
:从{numberField_lglvrpn8}
获取金额值。"status"
和"Document_Type"
分别表示状态和单据类型。
-
SQL语句
"otherRequest"
部分定义了要执行的SQL语句,用于将数据插入到目标MySQL数据库中。- SQL语句中的占位符(如
:extend_processInstanceId
)对应于请求参数中的字段。
实现过程
通过上述元数据配置,我们可以实现以下几个步骤:
-
提取(Extract): 从源系统提取所需的数据,并根据业务需求进行初步处理和清洗。
-
转换(Transform): 根据元数据配置,将提取的数据转换为目标系统所需的格式。例如,将订单号添加后缀,将日期格式化等。
-
加载(Load): 使用HTTP POST方法,将转换后的数据通过API接口发送到目标MySQL数据库,并执行插入操作。
这种方式不仅确保了数据的一致性和完整性,还大大简化了跨系统的数据集成过程。通过灵活的元数据配置,我们可以轻松适应不同业务场景下的数据需求,实现高效的数据流转和处理。