返回博客
工程AI管线

从提示词到生产环境:用 AI 在几分钟内构建数据管线

了解 AIP AI 管线构建器如何将自然语言描述转化为基于 Doris 的生产级 Flink SQL 管线,附带血缘、契约与监控。

Coomia 团队发布于 2025年1月10日6 分钟阅读
分享本文Twitter / X

#传统方式已经过时

构建数据管线曾经是一个耗时数周的工程。先收集业务需求,设计 DAG,编写提取逻辑,处理 Schema 映射,添加错误处理,配置监控,设置告警,编写测试,部署到预发环境,验证输出,最后推送到生产环境。

一个简单的「从系统 A 搬数据到系统 B」的任务,两周的工程时间就算快了。如果涉及实时处理、Schema 演进或多源 Join,一个月以上的周期并不罕见。

AIP AI 管线构建器彻底改变了这一切。

#工作原理

管线构建器接受自然语言描述。底层将针对数据工程模式微调的大语言模型与确保正确性的确定性代码生成引擎结合在一起。

来看一个真实案例。用户输入:

「每小时从 PostgreSQL OLTP 拉取新增和更新的订单。按 order_id 去重。计算每个客户的管线价值。全部加载到 Doris 供分析团队使用。」

30 秒内,AIP 生成完整的管线定义:

AIP 管线构建器从自然语言输入生成完整的 ETL 管线
AIP 管线构建器从自然语言输入生成完整的 ETL 管线
Click to zoom
AIP AI 管线构建器 — 从提示词到生产管线

生成的代码使用标准的 Apache Flink SQL 实现批流一体的处理,并以 Apache Doris 作为分析存储目标。没有专有运行时,没有锁定。

SQL
-- jobs/silver_crm_pipeline_value.sql
-- 由 AIP 管线构建器自动生成

-- Source: PostgreSQL CDC
CREATE TABLE bronze_orders (
    order_id      BIGINT,
    customer_id   BIGINT,
    amount        DECIMAL(18, 2),
    status        STRING,
    updated_at    TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'postgres-cdc',
    'hostname'  = '${PG_HOST}',
    'database-name' = 'sales',
    'schema-name'   = 'public',
    'table-name'    = 'orders'
);

-- Sink: Doris
CREATE TABLE silver_crm_pipeline_value (
    customer_id          BIGINT,
    weighted_value       DECIMAL(18, 2),
    opportunity_count    BIGINT,
    loaded_at            TIMESTAMP(3),
    PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
    'connector'    = 'doris',
    'fenodes'      = '${DORIS_FE}',
    'table.identifier' = 'analytics.silver_crm_pipeline_value'
);

-- 转换:去重 + 聚合 + UPSERT
INSERT INTO silver_crm_pipeline_value
SELECT
    customer_id,
    SUM(amount * stage_probability) AS weighted_value,
    COUNT(*)                        AS opportunity_count,
    CURRENT_TIMESTAMP               AS loaded_at
FROM (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY order_id
               ORDER BY updated_at DESC
           ) AS _dedup_rank
    FROM bronze_orders
    WHERE status IN ('open', 'pending', 'won')
)
WHERE _dedup_rank = 1
GROUP BY customer_id;

#生成的 Doris DDL 与作业清单

YAML
# pipeline.yml — 生成的作业清单
job:
  name: silver_crm_pipeline_value
  engine: flink
  mode: streaming             # batch | streaming | unified
  checkpoint_interval: 60s
  parallelism: 2

doris_ddl:
  database: proj_onto_paas
  table: silver_crm_pipeline_value
  layer: silver               # bronze | silver | gold
  unique_key: customer_id
  partition: none

contracts:
  - type: not_null
    columns: [customer_id]
  - type: freshness
    max_lag_minutes: 60
  - type: schema_drift
    on_breaking_change: alert

lineage:
  upstream:
    - postgres://sales.public.orders
  downstream:
    - doris://proj_onto_paas.silver_crm_pipeline_value
ℹ️Info

无专有运行时。 每一行生成的代码都是标准的 Flink SQL 和 Doris DDL。你可以检查、修改和版本管理所有内容。

#三层架构

AIP 与简单代码生成工具的区别在于其三层架构:

架构图展示 Intent → Pattern → Code 的管线生成流程
架构图展示 Intent → Pattern → Code 的管线生成流程
Click to zoom
三层管线生成架构

#1. 意图层

自然语言理解提取结构化意图:涉及哪些数据源、需要什么转换、目标表是什么样的、有哪些质量要求。

JSON
{
  "intent": {
    "sources": [
      { "system": "postgres", "schema": "public", "tables": ["orders", "customers"] }
    ],
    "transforms": [
      { "type": "dedup", "key": "order_id" },
      { "type": "aggregate", "metric": "pipeline_value", "group_by": "customer_id" }
    ],
    "target": { "system": "doris", "database": "analytics", "layer": "silver" },
    "mode": "streaming",
    "quality": { "freshness_sla_minutes": 60 }
  }
}

#2. 模式层

AIP 将意图与一套久经验证的 Flink SQL 模式库进行匹配 — CDC 摄入、基于 ROW_NUMBER 分区去重、增量聚合、Upsert Sink、Watermark 处理 — 每种模式都在数百个实际部署中经过验证。

#3. 代码层

代码生成器产出确定性的、经过测试的实现代码。与纯 LLM 生成不同,这一层使用模板和约束求解来保证正确性:

YAML
# pipeline_config.yml — 生成的约束
quality_checks:
  - type: not_null
    columns: [customer_id, order_id]
  - type: referential_integrity
    from: orders.customer_id
    to: customers.customer_id
  - type: freshness
    max_age_minutes: 120
    alert_channels: [slack, pagerduty]

retry_policy:
  max_attempts: 3
  backoff: exponential
  initial_delay_seconds: 30

dead_letter:
  enabled: true
  destination: s3://data-lake/dead-letter/crm/
💡Tip

对于真正新颖的需求,AIP 会生成带有明确标记的扩展点的脚手架,让你添加自定义逻辑,而不是盲目猜测。

#性能对比

指标传统方式AIP AI 构建器
首条管线耗时2-4 周< 10 分钟
样板代码行数~500 行0(自动生成)
内置质量检查手动配置自动包含
监控配置独立任务已集成
Schema 演进处理易出错自动检测
批流一体两套代码路径一条 Flink SQL

#立即体验

AI 管线构建器在所有 AIP 版本中可用,包括免费试用。描述一条你一直想构建的管线,看看它多快能上线。

Bash
# 快速开始 — 描述你的管线
mds pipeline create --prompt "从 PostgreSQL 加载每日销售数据,\
  计算 7 天滚动平均,\
  使用 2-sigma 阈值检测异常,\
  输出到 Doris analytics schema"

大多数用户反馈他们的第一条管线从提示词到生产环境不到十分钟

相关文章