Why One Kafka Connector Should Replace 200: How AI Is Finally Fixing the Integration Sprawl Problem
Today we’re open-sourcing kafka-connect-ai — a single, Apache 2.0 licensed Kafka Connect connector that replaces hundreds of single-purpose connectors with 12 protocol adapters and an LLM-powered transformation pipeline.
The technical breakthroughs that make this possible: compiled transforms that let the LLM generate reusable transformation code (declarative mappings at ~500ns/record, sandboxed GraalVM JavaScript at ~2μs/record) instead of calling the model per-record; 4-tier model routing that automatically sends simple records to the cheapest capable model — or bypasses the LLM entirely for deterministic transforms; semantic caching via Redis vector similarity that eliminates duplicate LLM calls; and PII masking that strips sensitive fields before they ever reach the model. The result: a universal connector that handles HTTP, JDBC, MongoDB, gRPC, Kafka-to-Kafka, Redis, Cassandra, Kinesis, cloud data warehouses, WebSocket/SSE streaming, LDAP, and SAP — with LLM costs that converge toward zero as the system learns your schemas.
The Connector Sprawl Problem
Ask any Kafka engineer what keeps them up at night, and it is rarely the broker. It is Connect. More specifically, it is the ever-expanding sprawl of connector JARs, configuration models, dependency conflicts, and version mismatches that accumulates in every mature Kafka deployment until it becomes a full-time job.
200 Connectors, One Fundamental Operation
The OSO engineers regularly walk into organisations running fifteen to twenty different connector JARs in a single Kafka Connect cluster. Each has its own version, its own transitive dependencies, and its own classpath conflicts with the others. Maintaining that ecosystem — updating JARs, resolving dependency collisions, debugging serialisation mismatches — consumes two to three engineers who should be building pipelines.
Every connector also introduces a new mental model. The JDBC connector structures its configuration differently from the HTTP connector. The S3 connector has its own authentication surface. The Elasticsearch connector has its own concept of index routing. There is no unified configuration language. Knowledge does not transfer — expertise with the Salesforce connector gives an engineer nothing useful when diagnosing the MongoDB connector.
Licensing Has Turned Sprawl Into a Strategic Risk
Apache Kafka itself remains Apache 2.0. But many connectors have followed a different trajectory. The Confluent Community Licence, introduced in 2018, moved Schema Registry, KSQL, and a significant portion of the connector catalogue under a licence that prohibits offering those components as competing SaaS products. Premium Connectors extended this further — mission-critical integrations like Oracle CDC are gated behind enterprise subscriptions.
The four risks are concrete: licensing terms on currently-used connectors may change; cost escalation follows platform lock-in; migrating away from connector-specific investments is extremely high-friction; and innovation velocity may slow as contributing organisations’ priorities shift.
kafka-connect-ai is Apache 2.0 licensed. No Community Licence, no Premium tier, no enterprise restrictions.
The Core Insight: Protocols, Not Products
After more than thirty enterprise Kafka engagements, the OSO engineers arrived at a conclusion that runs against the grain of how the Connect ecosystem developed: the connector-per-system model is a categorical error. The right abstraction is the protocol.
What changes between Salesforce and HubSpot is not the protocol — both speak REST HTTP. What changes between PostgreSQL and MySQL is not the protocol — both speak JDBC. What changes is the configuration surface and the data shape. Build a clean implementation of each protocol once, and let AI handle the per-system transformation dynamically.
kafka-connect-ai ships with 12 protocol adapters today:
| Adapter | Covers | Key Capabilities |
|---|---|---|
| HTTP REST | Every REST/SaaS API | 5 auth modes, 5 pagination strategies |
| JDBC | Every SQL database | 4 query modes, auto-DDL, upsert |
| Kafka | Cross-cluster replication | K2K bridge and migration |
| MongoDB | Document stores | Wire protocol, change streams |
| Warehouse | Snowflake, Redshift, BigQuery | Cloud analytics sinks |
| gRPC | Any gRPC service | Protobuf service definitions |
| Streaming | WebSocket, SSE, CometD | Real-time event sources |
| Redis | Streams, Pub/Sub, KV | Full Redis data model |
| Cassandra | Wide-column stores | CQL native protocol |
| Kinesis | AWS event streaming | AWS-native source/sink |
| LDAP | Directory services | Identity and org sync |
| SAP | Enterprise ERP | RFC calls (profile-gated) |
Each adapter covers an entire category of systems. The HTTP adapter covers every REST API. The JDBC adapter covers every SQL database. The gRPC adapter covers any gRPC service. Adding a new integration is a configuration change, not a development project.
Architecture: How It Actually Works
kafka-connect-ai is not an AI wrapper on top of existing connectors. It is a purpose-built transformation pipeline where the LLM generates code, not just data — and that code is cached and reused across all subsequent records with the same schema shape.

The Key Innovation: Compiled Transforms
This is the breakthrough that makes AI-powered data transformation economically viable at scale.
Traditional approaches call the LLM for every record. kafka-connect-ai takes a fundamentally different approach: the LLM generates transformation code, not transformed data. That code is compiled, validated, cached, and reused for every subsequent record with the same schema shape.

The two-tier execution strategy:
- Tier 0 — Declarative mappings (~500ns/record): Pure Java, zero security risk. Handles field renames, JSONPath mappings, defaults, and type casts. No scripting engine involved.
- Tier 1 — Sandboxed JavaScript (~2μs/record after JIT): GraalVM Polyglot with full sandboxing — no file system access, no network access, no host access. Configurable memory limit (10MB default) and execution timeout (100ms default). Handles complex transformations with conditional logic.
4-Tier Model Routing
Not every record needs the same model. kafka-connect-ai automatically routes records to the cheapest capable tier:
| Tier | Model | When | Cost |
|---|---|---|---|
| T0 | Deterministic (no LLM) | Field renames, type casts, timestamp formatting | $0 |
| T1 | Claude Haiku | Simple flat records | ~$0.25/MTok |
| T2 | Claude Sonnet | Moderate complexity | ~$3/MTok |
| T3 | Claude Opus | Complex nested structures | ~$15/MTok |
Records cascade down tiers — the router picks the cheapest model that can handle the complexity.
Cost Optimisation Stack
Four layers work together to minimise LLM spend:
| Layer | Mechanism | Savings |
|---|---|---|
| Compiled Transforms | LLM generates code once, cached by schema fingerprint | 100% after first record |
| Tier 0 Deterministic | Field renames, type casts — no LLM call at all | 100% |
| Tier 1 Fast Model | Simple records routed to Claude Haiku | ~90% vs default model |
| Semantic Cache | Redis vector similarity deduplicates LLM calls | 100% per cache hit |
| Prompt Caching | Anthropic caches repeated system prompts | ~50% input tokens |
Privacy: PII Masking
Sensitive fields are masked before data reaches the LLM and restored after transformation:
Input {"name": "John Smith", "email": "[email protected]", "order_id": 42}
↓ mask
To LLM {"name": "[MASKED_0]", "email": "[MASKED_1]", "order_id": 42}
↓ transform
From LLM {"customer": "[MASKED_0]", "contact": "[MASKED_1]", "id": 42}
↓ unmask
Output {"customer": "John Smith", "contact": "[email protected]", "id": 42}
Configure by field name (case-insensitive) or regex pattern matching. Meets GDPR requirements by ensuring PII never leaves your infrastructure boundary to reach the LLM provider.
Production Observability: 16 JMX Metrics
Every stage of the pipeline is instrumented via Micrometer:
| Category | Metrics |
|---|---|
| LLM | llm.calls.total, llm.call.latency (histogram), llm.tokens.input.total, llm.tokens.output.total, llm.cost.usd.total |
| Cache | cache.hits.total, cache.misses.total, cache.hit.ratio |
| Router | router.tier0.total, router.tier1.total, router.tier2.total, router.tier3.total |
| Transforms | compiled.transform.hits.total, compiled.transform.misses.total, compiled.transform.compilations.total |
| Resilience | circuit.breaker.open.total, llm.rate.limited.total |
| Adapter | records.processed.total, records.failed.total, adapter.fetch.latency, adapter.write.latency, batch.size |
All prefixed with connect.ai.* and exportable to Prometheus, Datadog, Grafana, or any Micrometer-compatible backend.
Getting Started
Prerequisites
- Java 17+
- Docker (for the quickstart stack)
- An Anthropic or OpenAI API key
1. Start the Stack
git clone https://github.com/osodevops/kafka-connect-ai.git
cd kafka-connect-ai
mvn clean package -pl kafka-connect-ai-connect -am -DskipTests
cd docker && docker compose up -d
This starts Kafka (KRaft mode), Schema Registry, Kafka Connect with kafka-connect-ai pre-loaded, PostgreSQL, and Redis.
2. Verify the Connector Plugin is Loaded
curl -s http://localhost:8083/connector-plugins | jq '.[].class' | grep ai
You should see:
"sh.oso.connect.ai.connect.source.AiSourceConnector"
"sh.oso.connect.ai.connect.sink.AiSinkConnector"
3. Example: REST API Source with LLM Transformation
Pull data from any REST API, transform it with AI, and write to Kafka:
curl -X POST http://localhost:8083/connectors
-H "Content-Type: application/json"
-d '{
"name": "weather-api-source",
"config": {
"connector.class": "sh.oso.connect.ai.connect.source.AiSourceConnector",
"tasks.max": "1",
"connect.ai.source.adapter": "http",
"connect.ai.topic": "weather-events",
"http.source.url": "https://api.openweathermap.org/data/2.5/weather?q=London&appid=YOUR_KEY",
"http.source.poll.interval.ms": "300000",
"http.source.auth.type": "none",
"ai.llm.provider": "anthropic",
"ai.llm.api.key": "${ANTHROPIC_API_KEY}",
"ai.llm.model": "claude-sonnet-4-20250514",
"ai.agent.system.prompt": "Extract: city, temp_celsius (convert from Kelvin), humidity_pct, wind_speed_ms, conditions, recorded_at (ISO 8601)."
}
}'
4. Example: Database CDC with PII Masking
Capture changes from PostgreSQL with sensitive fields automatically masked before reaching the LLM:
curl -X POST http://localhost:8083/connectors
-H "Content-Type: application/json"
-d '{
"name": "orders-db-source",
"config": {
"connector.class": "sh.oso.connect.ai.connect.source.AiSourceConnector",
"tasks.max": "1",
"connect.ai.source.adapter": "jdbc",
"connect.ai.topic": "order-events",
"jdbc.url": "jdbc:postgresql://postgres:5432/kcai",
"jdbc.user": "kcai",
"jdbc.password": "kcai",
"jdbc.table": "orders",
"jdbc.query.mode": "timestamp",
"jdbc.timestamp.column": "updated_at",
"ai.llm.provider": "anthropic",
"ai.llm.api.key": "${ANTHROPIC_API_KEY}",
"ai.llm.model": "claude-sonnet-4-20250514",
"ai.agent.system.prompt": "Normalise order rows into events with: order_id, customer_name, amount_usd, currency, status, event_timestamp.",
"ai.privacy.mask.fields": "email,phone,ssn,credit_card",
"ai.privacy.mask.patterns": "\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z]{2,}\b",
"ai.privacy.unmask.output": "true"
}
}'
5. Example: Kafka-to-Kafka Transformation with Full Cost Optimisation
Consume from one cluster, transform with AI, produce to another — with all cost-saving layers enabled:
curl -X POST http://localhost:8083/connectors
-H "Content-Type: application/json"
-d '{
"name": "k2k-normaliser",
"config": {
"connector.class": "sh.oso.connect.ai.connect.source.AiSourceConnector",
"tasks.max": "1",
"connect.ai.source.adapter": "kafka",
"connect.ai.topic": "normalised-events",
"kafka.source.bootstrap.servers": "upstream-kafka:9092",
"kafka.source.topics": "raw-events",
"ai.llm.provider": "anthropic",
"ai.llm.api.key": "${ANTHROPIC_API_KEY}",
"ai.llm.model": "claude-sonnet-4-20250514",
"ai.agent.system.prompt": "Merge legacy event formats into a unified schema: event_id, event_type, source_system, payload (object), occurred_at (ISO 8601).",
"ai.agent.enable.prompt.caching": "true",
"ai.router.enabled": "true",
"ai.router.deterministic.patterns": "field_rename,type_cast,timestamp_format",
"ai.llm.model.fast": "claude-haiku-4-5-20251001",
"ai.cache.enabled": "true",
"ai.cache.redis.url": "redis://redis:6379",
"ai.compiled.transform.enabled": "true",
"ai.compiled.transform.ttl.seconds": "86400",
"ai.batch.size": "50",
"ai.batch.wait.ms": "5000",
"ai.batch.parallel.calls": "4",
"ai.llm.rate.limit": "100"
}
}'
6. Example: MongoDB to Snowflake via Kafka (Sink Connector)
Write transformed records from Kafka to a cloud data warehouse:
curl -X POST http://localhost:8083/connectors
-H "Content-Type: application/json"
-d '{
"name": "warehouse-sink",
"config": {
"connector.class": "sh.oso.connect.ai.connect.sink.AiSinkConnector",
"tasks.max": "2",
"topics": "normalised-events",
"connect.ai.sink.adapter": "warehouse",
"warehouse.type": "snowflake",
"warehouse.snowflake.url": "https://acme.snowflakecomputing.com",
"warehouse.snowflake.database": "ANALYTICS",
"warehouse.snowflake.schema": "PUBLIC",
"warehouse.snowflake.table": "EVENTS",
"ai.llm.provider": "anthropic",
"ai.llm.api.key": "${ANTHROPIC_API_KEY}",
"ai.llm.model": "claude-sonnet-4-20250514",
"ai.agent.system.prompt": "Map event fields to warehouse columns: EVENT_ID, EVENT_TYPE, SOURCE, PAYLOAD_JSON, CREATED_AT."
}
}'
Adoption Guide: No Forklift Migration Required
kafka-connect-ai is a Kafka Connect connector. It deploys inside your existing Connect cluster, alongside your existing connectors, installed identically to any other connector JAR. There is no forklift migration.
Phase 1: Start with HTTP-based SaaS Integrations
These are the highest-friction connectors to maintain — API versioning changes, authentication updates, and pagination modifications from vendors require connector updates that depend on external maintainers. With kafka-connect-ai, updating a configuration parameter handles it immediately.
Phase 2: Consolidate JDBC Connectors
Most organisations have accumulated multiple JDBC connectors across different database sources. Consolidating into one adapter eliminates classpath conflicts, creates a single configuration model, and surfaces configuration inconsistencies across independently-managed connectors.
Phase 3: Expand to Specialised Protocols
With 12 adapters covering MongoDB, gRPC, Redis, Cassandra, Kinesis, WebSocket/SSE streaming, LDAP, SAP, and cloud data warehouses (Snowflake, Redshift, BigQuery), you can progressively migrate specialised connectors without changing anything about your Kafka infrastructure.
Phase 4: Audit Licensing Exposure
Separate from the technical migration, identify which connectors in your cluster fall under the Confluent Community Licence, which are gated behind enterprise subscriptions, and which carry managed connector pricing. Quantify the cost. kafka-connect-ai is Apache 2.0 — run it on-premises, in the cloud, or offer it as a service, with zero licensing constraints.
Comparison
| Feature | kafka-connect-ai | Custom Connector | Debezium | MirrorMaker 2 |
|---|---|---|---|---|
| Sources | 12 protocol adapters | One per connector | JDBC (CDC) | Kafka only |
| Sinks | HTTP, JDBC, MongoDB, Warehouse, Redis, Cassandra, Kinesis, gRPC | One per connector | N/A | Kafka only |
| Transformation | LLM (natural language) + compiled transforms | Java code | SMTs only | SMTs only |
| Schema enforcement | JSON Schema + LLM auto-retry | Manual | Avro/JSON Schema | None |
| New integration effort | Config change | Weeks of development | Limited to supported DBs | Kafka-to-Kafka only |
| Multi-model routing | 4-tier automatic | N/A | N/A | N/A |
| Semantic caching | Redis vector similarity | N/A | N/A | N/A |
| PII masking | Built-in field + regex masking | Custom code | N/A | N/A |
| Cost tracking | Per-record USD cost metrics | N/A | N/A | N/A |
| Licence | Apache 2.0 | Varies | Apache 2.0 | Apache 2.0 |
When NOT to Use kafka-connect-ai
- Sub-millisecond latency — Initial LLM calls add latency (100ms-5s). Compiled transforms bring this down to microseconds, but the first record for each new schema shape requires an LLM call.
- Full CDC with WAL — For database replication with transaction ordering, Debezium is purpose-built. kafka-connect-ai uses polling queries.
- Binary data — kafka-connect-ai works with JSON. For Avro, Protobuf, or binary payloads, use specialised connectors.
Conclusion
The connector-per-system model was a pragmatic solution to a 2015 problem. In 2025, it has become a source of operational overhead, licensing risk, and organisational fragility. Two hundred connectors, each encoding the same structural logic for a different product, each requiring specialist knowledge — this is not the right foundation for enterprise data integration at scale.
The abstraction was always wrong. Protocols matter; products do not. What was missing was a transformation engine sophisticated enough to handle per-system variability without a separate codebase for each system — and smart enough to learn from the data it sees so it only needs the LLM once per schema shape, not once per record.
That is what kafka-connect-ai provides. Twelve protocol adapters. LLM-compiled transforms that converge toward zero cost. Four-tier model routing. PII masking. Semantic caching. 16 production-grade JMX metrics. Apache 2.0 licensed.
The project is open source, the adapter interface is designed for community contribution, and the conversation is open.
This post first appeared on Read More

