ETL流程:从金蝶云星空数据获取到MySQL写入的完整实现

  • 轻易云集成顾问-吕修远

CRM-金蝶客户同步-修改:金蝶云星空数据集成到MySQL的技术实现

在本案例中,我们将探讨如何通过轻易云数据集成平台,将金蝶云星空客户数据高效地同步至MySQL数据库。这一过程主要利用了“CRM-金蝶客户同步-修改”方案,结合API接口executeBillQuery和execute,实现大批量数据的快速写入、实时监控以及异常处理。

数据获取与转换

首先,通过调用金蝶云星空提供的API接口executeBillQuery抓取客户信息。该接口支持定时调度机制,使得系统能够可靠地在预设时间周期内获取最新的数据。此外,为了应对分页和限流问题,我们设计了智能分段查询策略,确保每次请求获得的数据量最优且不会触发API调用限制。

为了适应业务需求和目标数据库的数据结构,在抓取到原始数据后需要进行必要的转换处理。我们使用自定义逻辑将原始JSON格式的数据映射为符合MySQL表结构的记录,为后续写入操作做好准备。在这一环节,可视化的数据流设计工具成为关键,简化了复杂转换规则的创建与管理,提高配置效率及准确性。

高吞吐量的数据写入

数据转化后的记录通过执行MySQL写入API execute 实现高速批量插入。在此过程中,高吞吐能力使得大量数据能够迅速、安全地被存储到MySQL数据库中,有效保障了系统性能及用户体验。同时,集中式的监控系统可实时追踪任务状态及性能指标,当出现异常情况例如网络波动或资源不足时,即时触发告警并启动错误重试机制,以最大程度减少对业务连续性的影响。

本篇文章仅是起点,下文将详细介绍整个集成流程中的各个技术细节,包括具体代码示例、参数配置以及最佳实践等,以确保您能全面掌握这一解决方案并应用于实际项目中。 用友与MES系统接口开发配置

调用金蝶云星空接口executeBillQuery获取并加工数据

在轻易云数据集成平台的生命周期中,调用源系统接口是数据集成的第一步。本文将详细探讨如何通过调用金蝶云星空的executeBillQuery接口来获取并加工客户数据。

接口配置与请求参数

在元数据配置中,executeBillQuery接口被定义为一个POST请求,用于查询客户信息。以下是主要的请求参数及其配置:

  • FormId: 业务对象表单Id,必须填写金蝶的表单ID,例如:BD_Customer
  • FieldKeys: 需查询的字段key集合,格式为数组,通过解析器转换为字符串。
  • FilterString: 过滤条件,用于筛选特定的数据,例如:FUseOrgId.Fnumber='T00' and (FModifyDate>='{{LAST_SYNC_TIME|datetime}}' or FForbidDate>='{{LAST_SYNC_TIME|datetime}}')
  • Limit: 最大行数,用于分页控制。
  • StartRow: 开始行索引,用于分页控制。
  • TopRowCount: 返回总行数,用于分页控制。

请求示例

以下是一个完整的请求示例:

{
  "FormId": "BD_Customer",
  "FieldKeys": "FCUSTID,FNumber,FName,FCreateOrgId.FNumber,FUseOrgId.FNumber,FDescription,FIsTrade,FCustTypeId.FNumber,FGroup.FNumber,FSALDEPTID.FNumber,FSELLER.FNumber,FSETTLETYPEID.FName,FRECCONDITIONID.FName,FDISCOUNTLISTID.FNumber,FPRICELISTID.FNumber,FTRANSLEADTIME,FInvoiceType,FTaxType.FNumber,FShortName,FADDRESS,FZIP,FWEBSITE,FTEL,FFAX,FCompanyClassify.FNumber,FCompanyNature.FNumber,FCompanyScale.FNumber,FINVOICETITLE,FTAXREGISTERCODE,FINVOICEBANKNAME,FINVOICETEL,FINVOICEBANKACCOUNT,FINVOICEADDRESS,FSUPPLIERID.FNumber,FIsGroup,FIsDefPayer,FGROUPCUSTID.FNumber,FCOUNTRY1.FNumber,FBANKCODE,FACCOUNTNAME,FBankTypeRec.FNumber,FTextBankDetail,FBankDetail.FNumber,FOpenAddressRec,FCNAPS,FCURRENCYID.fname,FDefaultContact.FNumber,FCOUNTRY.fnumber",
  "FilterString": "FUseOrgId.Fnumber='T00' and (FModifyDate>='{{LAST_SYNC_TIME|datetime}}' or FForbidDate>='{{LAST_SYNC_TIME|datetime}}')",
  "Limit": "2000",
  "StartRow": "0",
  "TopRowCount": 1
}

数据清洗与加工

获取到原始数据后,需要对数据进行清洗和加工,以满足业务需求。以下是一些常见的数据清洗与加工操作:

  1. 字段映射与重命名:将原始字段映射到目标系统所需的字段。例如,将FName映射为目标系统中的客户名称字段。
  2. 数据类型转换:将字符串类型的数据转换为目标系统所需的数据类型。例如,将日期字符串转换为日期对象。
  3. 缺失值处理:处理缺失值或默认值。例如,如果某个字段为空,可以设置一个默认值。

实现代码示例

以下是一个Python代码示例,展示了如何调用接口并处理返回的数据:

import requests
import json

# 定义请求参数
payload = {
    "FormId": "BD_Customer",
    "FieldKeys": ",".join([
        "FCUSTID", "FNumber", "FName", 
        # ...其他字段...
        "FCOUNTRY.fnumber"
    ]),
    "FilterString": "FUseOrgId.Fnumber='T00' and (FModifyDate>='2023-01-01' or FForbidDate>='2023-01-01')",
    "Limit": "2000",
    "StartRow": "0",
    "TopRowCount": 1
}

# 发起POST请求
response = requests.post(
    url="https://api.kingdee.com/executeBillQuery",
    headers={"Content-Type": "application/json"},
    data=json.dumps(payload)
)

# 检查响应状态码
if response.status_code == 200:
    data = response.json()

    # 数据清洗与加工
    processed_data = []
    for item in data:
        processed_item = {
            'CustomerID': item['FCUSTID'],
            'CustomerCode': item['FNumber'],
            'CustomerName': item['FName'],
            # ...其他字段映射...
        }
        processed_data.append(processed_item)

    # 输出处理后的数据
    print(json.dumps(processed_data, indent=4))
else:
    print(f"Error: {response.status_code}, {response.text}")

通过上述步骤,我们可以高效地从金蝶云星空获取客户数据,并进行必要的清洗和加工,为后续的数据集成奠定基础。 企业微信与OA系统接口开发配置

数据集成平台生命周期的第二步:ETL转换与写入MySQL

在数据集成过程中,ETL(提取、转换、加载)是关键的一环。本文将重点探讨如何将已经集成的源平台数据进行ETL转换,并通过MySQL API接口写入目标平台。我们将详细解析元数据配置,并展示实际操作步骤。

元数据配置解析

在本案例中,我们需要将CRM系统中的客户数据同步到金蝶系统,最终写入MySQL数据库。以下是元数据配置的关键部分:

{
  "api": "execute",
  "effect": "EXECUTE",
  "method": "SQL",
  "number": "id",
  "id": "id",
  "name": "id",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "主参数",
      ...
    }
  ],
  ...
}

该配置定义了一个API调用,使用execute方法执行SQL语句。main_params包含了所有需要传递给SQL语句的动态参数。

数据请求与清洗

在ETL过程中,首先需要从源系统提取数据,并进行必要的清洗和格式转换。例如,从CRM系统提取的数据可能包含各种格式和冗余信息,需要通过脚本或工具进行标准化处理。

数据转换与写入

接下来,我们重点关注如何将清洗后的数据转换为目标平台可接受的格式,并通过API接口写入MySQL数据库。以下是具体步骤:

  1. 定义SQL语句: 在元数据配置中,我们定义了一条用于更新客户信息的SQL语句:

    update wk_wodtop_customer 
    set customer_name=:customer_name,
       creating_org=:creating_org,
       using_org=:using_org,
       abbreviation=:abbreviation,
       customer_code=:customer_code,
       short_code=:short_code,
       address=:address,
       country=:country,
       region=:region,
       province=:province,
       city=:city,
       mailing_address=:mailing_address,
       postal_code=:postal_code,
       company_website=:company_website,
       legal_representative=:legal_representative,
       registered_capital=:registered_capital,
       establishment_date=NULLIF(:establishment_date,''),
       industry=:industry,
       registered_address=:registered_address,
       phone_number=:phone_number,
       fax_number=:fax_number,
       company_category=:company_category,
       company_nature=:company_nature,
       company_scale=:company_scale,
       invoice_title=:invoice_title,
       taxpayer_regist_number=:taxpayer_regist_number,
       bank_of_deposit=:bank_of_deposit,
       bank_account_number=:bank_account_number,
       invoicing_phone_number=:invoicing_phone_number,
       invoicing_mail_address=:invoicing_mail_address,
       uni_social_credit_code=:uni_social_credit_code,
       corresp_supplier=:corresp_supplier,
       corresp_corp_customer=:corresp_corp_customer,
       corporate_customer=:corporate_customer,
       default_payer=:default_payer,
       customer_category=:customer_category,
       customer_group=:customer_group,
       corresponding_org=:corresponding_org,
       settlement_currency=:settlement_currency,
        sales_depart =:sales_depart, 
        salesman =:salesman, 
        frozen_status =:frozen_status, 
        frozen_scope =:frozen_scope, 
        freezer =:freezer, 
        freeze_date=NULLIF(:freeze_date,''), 
        settlement_method =:settlement_method, 
        payment_terms =:payment_terms, 
        price_list =:price_list, 
        discount_list =:discount_list, 
        settlement_card =:settlement_card, 
        settling_party =:settling_party, 
        payer =:payer, 
        default_contact_person =:default_contact_person, 
        mandat_contact_person =:mandat_contact_person, 
        lead_time_transportat =:lead_time_transportat, 
        tax_category =:tax_category, 
        invoice_type =:invoice_type, 
        default_tax_rate =:default_tax_rate, 
        enable_credit_manage =:enable_credit_manage, 
        customer_priority =:customer_priority, 
        other_details =:other_details, 
        document_status =:document_status, 
        disabler =:disabler,  
    disable_date=NULLIF(:disable_date,''),  
    disable_status= :disable_status ,  
    company_id= :company_id ,F_IsAutoMTO= :F_IsAutoMTO ,Is_del= :Is_del  
    where data_id= :data_id
  2. 参数映射: 将源系统的数据字段映射到目标系统的字段。例如:

    {
     "field": "customer_name",
     "label": "客户名称",
     "type": "string",
     "value": "{FName}"
    }
  3. 执行API调用: 使用轻易云平台提供的API接口,将映射后的参数传递给预定义的SQL语句并执行。例如,通过HTTP POST请求发送如下JSON payload:

    {
     "main_params": {
         ...
         // 所有映射后的参数
         ...
     },
     "main_sql": "...", // 上述定义的SQL语句
    }
  4. 处理返回结果: 执行完毕后,处理API返回的结果,例如确认记录是否成功更新或插入。

实际操作示例

假设我们从CRM系统获取了一条客户记录,需要同步到金蝶系统并写入MySQL数据库。具体操作如下:

  1. 从CRM系统提取原始数据。
  2. 清洗并标准化数据。
  3. 根据元数据配置,将原始字段映射为目标字段。
  4. 构建API请求payload并发送。
  5. 检查返回结果,确保数据成功写入。

通过上述步骤,我们实现了从源系统到目标系统的数据无缝对接,并确保了数据的一致性和完整性。这种基于轻易云平台的数据集成方式,不仅提高了效率,还极大地简化了复杂的数据处理流程。 钉钉与CRM系统接口开发配置

更多系统对接方案