> ## Documentation Index
> Fetch the complete documentation index at: https://docs.wirekite.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka and Redpanda

> How to configure a Kafka or Redpanda queue target — broker prerequisites, topic creation, compression, and connection testing.

## Overview

Apache Kafka and Redpanda are interchangeable from Wirekite's
perspective: both speak the Kafka wire protocol, and Wirekite uses the
same driver and the same code path for both. The only customer-visible
difference is the **type** label on the queue target (`kafka` vs
`redpanda`) — pick whichever matches the system you're actually
operating, since that label flows through to the UX list and metrics
attribution.

For cross-vendor concepts (message format, ordering, retries, charts),
see [Queues Overview](/queues/overview).

## Prerequisites

### Broker cluster

You need a reachable broker cluster — Wirekite does not bundle or
manage one. The cluster must:

* Speak Kafka protocol (Kafka, Redpanda, or any compatible
  implementation).
* Accept plaintext connections from the Wirekite extractor host. SASL
  and TLS are not yet supported by the Wirekite client; terminate them
  in front of the broker if your environment requires them.
* Allow auto-topic-creation OR have the topic pre-created (see below).

### Topic

Wirekite uses **one topic per queue target** — every change record from
every table in the migration lands on the same topic. Pick a name that
attributes the stream to the migration, e.g.
`<env>-wirekite-<source>-to-<target>`.

```
# Kafka
kafka-topics.sh --bootstrap-server <broker>:9092 \
  --create --topic <topic-name> \
  --partitions <N> --replication-factor <R>

# Redpanda
rpk topic create <topic-name> \
  --partitions <N> --replicas <R> \
  --brokers <broker>:9092
```

<Tip>
  Partition count caps the consumer-side parallelism. For a single-table
  migration that fits in one partition, `--partitions 1` keeps things
  simple; for high-volume multi-table migrations, size partitions to the
  expected consumer parallelism. Wirekite's per-table ordering guarantee
  holds at any partition count because lines from one transaction ship in
  one ordered batch.
</Tip>

If `auto.create.topics.enable=true` on the broker, Wirekite's first
publish will create the topic implicitly. The connection test (see
below) explicitly avoids auto-create so a missing topic surfaces as an
error rather than an unintended side effect.

## Wirekite configuration

### Queue-target definition

Create the queue target through the UX wizard or the API. The field set:

| Field         | Required | Description                                                    |
| ------------- | -------- | -------------------------------------------------------------- |
| `type`        | yes      | `kafka` or `redpanda`                                          |
| `brokers`     | yes      | Comma-separated `host:port` list                               |
| `topic`       | yes      | Topic name (must exist or auto-create must be enabled)         |
| `batchSize`   | no       | `.ckt` lines per Kafka message; default `1`                    |
| `compression` | no       | `none`, `snappy`, `lz4`, `zstd`; default `""` (no compression) |

The same fields, with the same names, are exposed through the API.

### Extractor config keys

When wiring the extractor directly (e.g. for `qa/fulltest` or scripted
runs), the orchestrator emits these keys into the change-extractor
config:

```
queueType=kafka
queueBrokers=broker1:9092,broker2:9092
queueTopic=<topic-name>
queueBatchSize=100
queueCompression=zstd
```

`queueType=redpanda` is accepted as a synonym — the dispatch is identical.

## Compression

Compression is applied at the Kafka producer (per-message wire format),
not at the application layer. The codec choice is a CPU-vs-bandwidth
trade-off:

| Codec            | CPU cost | Compression ratio | Notes                                          |
| ---------------- | -------- | ----------------- | ---------------------------------------------- |
| `none` (default) | None     | 1.0×              | Use when broker is local and bandwidth is free |
| `snappy`         | Low      | \~2×              | Good general default                           |
| `lz4`            | Low      | \~2×              | Slightly faster than snappy on most CPUs       |
| `zstd`           | Medium   | \~3×              | Best compression; modest CPU cost              |

For a CDC stream of newline-delimited row data, `zstd` typically halves
broker storage at a small CPU cost on the extractor side.

## Authentication

<Warning>
  The current release supports **plaintext brokers only**. SASL\_PLAINTEXT,
  SASL\_SSL, and mTLS are not wired through the Wirekite client. If your
  brokers require auth, terminate it at a sidecar/proxy in front of the
  broker — for example, a local Kafka REST proxy or an
  identity-translation gateway — until first-class auth lands.
</Warning>

## Connection testing

The UX wizard exposes a **Test Message** button that runs a Metadata
request against the configured brokers with auto-create disabled. A
successful test confirms:

* The first broker in the list is reachable on the configured port.
* The cluster responds to a metadata request within a 10-second timeout.
* The named topic exists.

Failure modes the test catches:

| Test failure                          | Cause                                        |
| ------------------------------------- | -------------------------------------------- |
| `broker <addr> unreachable`           | DNS, firewall, port, or down broker          |
| `topic does not exist on the cluster` | Topic not pre-created and auto-create is off |
| `metadata error for topic`            | Cluster-side ACL or other broker rejection   |

The same probe runs at extractor startup; a queue target that passes
the wizard test will also pass the startup gate, barring intervening
broker state changes.

## Failure semantics

The Wirekite Kafka publisher is fail-fast: any publish error from
`kafka-go.WriteMessages` (after the SDK's `MaxAttempts=10` budget is
spent) increments the `errors` counter, logs `QUEUE: flush failed`,
and exits the extractor. The orchestrator flips the migration to
**Replication Failed**; recovery is through **Resume from Failed**,
which replays from the last commit boundary persisted to the
progress DB.

On the SQL Server C extractor, the `librdkafka` driver runs with
`message.timeout.ms=300000` (5 minutes). Any message that cannot be
durably acknowledged within that window triggers the same fail-fast
path through the delivery-report callback.

This means a transient broker outage shorter than the SDK's retry
budget is invisible to Wirekite — the SDK retries internally. A
sustained outage causes a clean failure plus replay-on-recovery.

## Operations

The reference consumer for end-to-end verification is at
`qa/queue_reader/queue_reader` in the Wirekite source tree:

```
queue_reader -brokers <broker>:9092 -topic <topic-name> \
  -group <consumer-group-id> -outdir <demux-dir>
```

It subscribes to the topic, demultiplexes incoming messages by the
`OP\tschema\ttable\t...` prefix into per-table `.ckt` files in
`<demux-dir>`, and prints final counters that should match the
extractor's `submitted` / `delivered`. This is the same utility used
by `qa/fulltest` to validate end-to-end correctness.

For Pub/Sub-specific setup, see [Google Pub/Sub](/queues/pubsub).
