Back to Blog
engineeringaipipelines

From Prompt to Production: Building Data Pipelines in Minutes with AI

Learn how the AIP AI Pipeline Builder turns natural language descriptions into production-grade Flink SQL pipelines on Doris — complete with lineage, contracts, and monitoring.

Coomia TeamPublished on January 10, 20255 min read
Share this articleTwitter / X

#The Old Way Was Broken

Building a data pipeline used to be a multi-week affair. First you gather requirements from the business team. Then you design the DAG, write the extraction logic, handle schema mapping, add error handling, configure monitoring, set up alerting, write tests, deploy to staging, validate outputs, and finally push to production.

For a simple "move data from system A to system B" task, two weeks of engineering time was considered fast. For anything involving real-time processing, schema evolution, or multi-source joins, you could easily be looking at a month or more.

The AIP AI Pipeline Builder changes this equation entirely.

#How It Works

The pipeline builder accepts natural language descriptions. Behind the scenes, it combines a large language model fine-tuned on data engineering patterns with a deterministic code generation engine.

Here is a real example. A user types:

"Pull all new and updated orders from our PostgreSQL OLTP every hour. Deduplicate by order_id. Calculate the pipeline value per customer. Load everything into Doris for the analytics team."

Within 30 seconds, AIP generates a complete pipeline definition:

AIP Pipeline Builder generating a complete ETL pipeline from natural language input
AIP Pipeline Builder generating a complete ETL pipeline from natural language input
Click to zoom
AIP AI Pipeline Builder — from prompt to production pipeline

The generated code uses standard Apache Flink SQL for unified batch + streaming processing, and Apache Doris as the target analytical store. No proprietary runtime or lock-in.

SQL
-- jobs/silver_crm_pipeline_value.sql
-- Auto-generated by AIP Pipeline Builder

-- Source: PostgreSQL CDC
CREATE TABLE bronze_orders (
    order_id      BIGINT,
    customer_id   BIGINT,
    amount        DECIMAL(18, 2),
    status        STRING,
    updated_at    TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'postgres-cdc',
    'hostname'  = '${PG_HOST}',
    'database-name' = 'sales',
    'schema-name'   = 'public',
    'table-name'    = 'orders'
);

-- Sink: Doris
CREATE TABLE silver_crm_pipeline_value (
    customer_id          BIGINT,
    weighted_value       DECIMAL(18, 2),
    opportunity_count    BIGINT,
    loaded_at            TIMESTAMP(3),
    PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
    'connector'    = 'doris',
    'fenodes'      = '${DORIS_FE}',
    'table.identifier' = 'analytics.silver_crm_pipeline_value'
);

-- Transformation: dedup + aggregate + UPSERT
INSERT INTO silver_crm_pipeline_value
SELECT
    customer_id,
    SUM(amount * stage_probability) AS weighted_value,
    COUNT(*)                        AS opportunity_count,
    CURRENT_TIMESTAMP               AS loaded_at
FROM (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY order_id
               ORDER BY updated_at DESC
           ) AS _dedup_rank
    FROM bronze_orders
    WHERE status IN ('open', 'pending', 'won')
)
WHERE _dedup_rank = 1
GROUP BY customer_id;

#Generated Doris DDL and Job Manifest

YAML
# pipeline.yml — generated manifest
job:
  name: silver_crm_pipeline_value
  engine: flink
  mode: streaming             # batch | streaming | unified
  checkpoint_interval: 60s
  parallelism: 2

doris_ddl:
  database: proj_onto_paas
  table: silver_crm_pipeline_value
  layer: silver               # bronze | silver | gold
  unique_key: customer_id
  partition: none

contracts:
  - type: not_null
    columns: [customer_id]
  - type: freshness
    max_lag_minutes: 60
  - type: schema_drift
    on_breaking_change: alert

lineage:
  upstream:
    - postgres://sales.public.orders
  downstream:
    - doris://proj_onto_paas.silver_crm_pipeline_value
ℹ️Info

No proprietary runtime. Every line of generated code is standard Flink SQL and Doris DDL. You can inspect, modify, and version-control it all.

#The Three-Layer Architecture

What makes AIP different from simple code generation is its three-layer approach:

Architecture diagram showing the Intent → Pattern → Code pipeline generation flow
Architecture diagram showing the Intent → Pattern → Code pipeline generation flow
Click to zoom
Three-layer pipeline generation architecture

#1. Intent Layer

Natural language understanding extracts structured intent: what sources are involved, what transformations are needed, what the target looks like, and what quality expectations exist.

JSON
{
  "intent": {
    "sources": [
      { "system": "postgres", "schema": "public", "tables": ["orders", "customers"] }
    ],
    "transforms": [
      { "type": "dedup", "key": "order_id" },
      { "type": "aggregate", "metric": "pipeline_value", "group_by": "customer_id" }
    ],
    "target": { "system": "doris", "database": "analytics", "layer": "silver" },
    "mode": "streaming",
    "quality": { "freshness_sla_minutes": 60 }
  }
}

#2. Pattern Layer

AIP matches intent against a library of proven Flink SQL patterns — CDC ingestion, deduplication with ROW_NUMBER over partition, incremental aggregation, upsert sinks, watermark handling — each validated across hundreds of real deployments.

#3. Code Layer

The code generator produces deterministic, tested implementation code. Unlike pure LLM generation, this layer uses templates and constraint solving to guarantee correctness:

YAML
# pipeline_config.yml — generated constraints
quality_checks:
  - type: not_null
    columns: [customer_id, order_id]
  - type: referential_integrity
    from: orders.customer_id
    to: customers.customer_id
  - type: freshness
    max_age_minutes: 120
    alert_channels: [slack, pagerduty]

retry_policy:
  max_attempts: 3
  backoff: exponential
  initial_delay_seconds: 30

dead_letter:
  enabled: true
  destination: s3://data-lake/dead-letter/crm/
💡Tip

For truly novel requirements, AIP generates a scaffold with clearly marked extension points where you add custom logic, rather than trying to guess.

#Performance Comparison

MetricTraditionalAIP AI Builder
Time to first pipeline2-4 weeks< 10 minutes
Lines of boilerplate~5000 (auto-generated)
Built-in quality checksManualAutomatic
Monitoring setupSeparate taskIncluded
Schema evolution handlingError-proneAuto-detected
Batch + streaming parityTwo code pathsOne Flink SQL job

#Try It Yourself

The AI Pipeline Builder is available in all AIP plans, including the free trial. Describe a pipeline you've been putting off building, and see how fast it ships.

Bash
# Quick start — describe your pipeline
mds pipeline create --prompt "Load daily sales from PostgreSQL, \
  calculate 7-day rolling averages, \
  detect anomalies using 2-sigma threshold, \
  output to Doris analytics schema"

Most users report their first pipeline goes from prompt to production in under ten minutes.

Related Articles