> ## Documentation Index
> Fetch the complete documentation index at: https://docs.wirekite.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Queues Overview

> How Wirekite publishes change records to message queues — supported backends, message format, ordering, batching, retries, and the operational signals you can monitor.

## Overview

A Wirekite **queue target** is a first-class destination — alongside file/DB
targets — that receives the change-data-capture stream produced by an
extractor. A migration may have a file/DB target, a queue target, or both.
When a queue is configured, every committed change is published as a
message; the queue is the integration point for downstream consumers
(your loader, a stream processor, a search index, etc.).

<Note>
  Queue targets are used during the **replication** phase only. Initial
  schema and bulk-data loads always go through the file/DB target. A
  migration whose only target is a queue is therefore a replication-only
  migration — it cannot be run in `data` or `data-replication` modes.
</Note>

## Supported backends

| Backend        | Type identifier | Driver                                                                               |
| -------------- | --------------- | ------------------------------------------------------------------------------------ |
| Apache Kafka   | `kafka`         | `segmentio/kafka-go` (mysql/postgres/oracle), `librdkafka` (sqlserver)               |
| Redpanda       | `redpanda`      | Same Kafka driver — Redpanda speaks the Kafka wire protocol                          |
| Google Pub/Sub | `pubsub`        | `cloud.google.com/go/pubsub` (mysql/postgres/oracle), `google-cloud-cpp` (sqlserver) |

All four source databases (MySQL, PostgreSQL, Oracle, SQL Server) can
publish to any of the three backends. The choice is per-queue-target,
not per-deployment — a single Wirekite installation can have Kafka,
Redpanda, and Pub/Sub queue targets in use at the same time.

<Tip>
  Pick **Kafka or Redpanda** when you control the broker hardware, latency
  is dominant, or the downstream consumer is Kafka-native. Pick **Pub/Sub**
  when the deployment already runs in Google Cloud, or you don't want to
  operate a broker cluster.
</Tip>

## Configuration parameters

The same parameter set applies whether you create a queue target through
the UX wizard, the API, or by hand-writing extractor configs for
`qa/fulltest`. A queue target is identified by `queueType`; the rest of
the parameters split into shared, kafka-only, and pubsub-only groups.

### Shared parameters

<ResponseField name="queueType" type="string" required>
  Backend selector. One of `kafka`, `redpanda`, `pubsub`. Empty value
  defaults to `kafka` for backward compatibility with pre-Pub/Sub configs.
</ResponseField>

<ResponseField name="queueTopic" type="string" required>
  Topic / Pub/Sub topic id. A single topic carries every change record
  for the migration — there is one topic per queue target, not one per
  source table. Customer pre-creates the topic; Wirekite errors loudly
  at startup if it does not exist.
</ResponseField>

<ResponseField name="queueBatchSize" type="integer" default="1">
  Number of `.ckt` change-record lines packed into a single broker
  message. `1` means one line per message (highest fidelity, highest
  message count). Higher values reduce broker RPC volume but increase
  per-message size. See "Message compilation" below for the exact
  packing rules.
</ResponseField>

### Kafka / Redpanda parameters

<ResponseField name="queueBrokers" type="string" required>
  Comma-separated list of broker addresses (e.g.
  `broker1.example.com:9092,broker2.example.com:9092`). The first
  broker is dialed at startup as a connectivity check; subsequent
  metadata is discovered from the cluster.
</ResponseField>

<ResponseField name="queueCompression" type="string" default="">
  Wire-format compression codec. One of `""` (no compression), `none`,
  `snappy`, `lz4`, `zstd`. Unknown codecs are rejected at startup.
</ResponseField>

<Warning>
  Authentication (SASL/TLS) is **not** supported in the current release.
  Plaintext brokers only. If you need SASL today, terminate it in front of
  the broker (e.g. with a sidecar proxy) until first-class auth lands.
</Warning>

### Google Pub/Sub parameters

<ResponseField name="queueProject" type="string" required>
  GCP project id that owns the topic. Used as the project segment in
  `projects/<project>/topics/<topic>`.
</ResponseField>

<ResponseField name="queueCredentialsFile" type="string" required>
  Absolute path to a service-account JSON key on the extractor host.
  The plaintext file is encrypted in place by the API at queue-target
  creation time, so the unencrypted contents do not need to remain on
  disk afterwards. Empty value falls back to Application Default
  Credentials.
</ResponseField>

<Note>
  The `queueCompression` field is **rejected** on Pub/Sub queue targets —
  gRPC handles transport compression transparently. Setting it surfaces a
  config typo rather than silently ignoring it.
</Note>

## Message compilation

The change extractor produces a stream of native Wirekite `.ckt` change
record lines, each line representing one row event in the form:

```
OP\tschema\ttable\t<column data...>\n
```

`OP` is one of `I` (insert), `U` (update), `D` (delete). Every line is
self-contained and carries the full row image as required by the
downstream change loader.

### Per-line vs batched messages

When `queueBatchSize=1`, every `.ckt` line becomes one queue message.
When `queueBatchSize=N` (N > 1), the publisher accumulates lines in an
internal buffer and emits one message holding `N` newline-terminated
`.ckt` records once the buffer is full. The downstream consumer
demultiplexes by splitting the message body on `\n`.

Batching trade-offs:

| Concern              | `batchSize=1`               | `batchSize=100`                           |
| -------------------- | --------------------------- | ----------------------------------------- |
| Broker RPC count     | One per row                 | One per \~100 rows                        |
| Per-message overhead | Dominates at high row rates | Amortized                                 |
| Failure blast radius | One row per failed publish  | Up to `batchSize` rows per failed publish |
| Consumer parallelism | Per-row                     | Per-batch                                 |

For high-volume migrations, `queueBatchSize=100` is a typical sweet spot;
beyond that, broker per-message limits start to bind on wide rows.

### Ordering and partitioning

Change records for a given source row must arrive at the consumer in
commit order, otherwise CDC semantics break (e.g. an `UPDATE` arriving
before its preceding `INSERT`). Wirekite preserves per-table order:

| Backend          | Ordering mechanism                                                                                                                                                                                                                                                |
| ---------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Kafka / Redpanda | Messages within a single produce call are ordered. The default `LeastBytes` balancer hashes across partitions; the consumer's per-partition order matches per-table publish order because `FlushAll` ships one transaction's worth of lines as one ordered batch. |
| Google Pub/Sub   | Each message carries `OrderingKey = "<schema>.<table>"`. The publisher sets `EnableMessageOrdering=true` on the topic; the subscription must be created with `--enable-message-ordering`.                                                                         |

When `batchSize > 1` and the backend is Pub/Sub, lines are accumulated
in **per-table** batchers — a single Pub/Sub message never mixes rows
from different tables, because every message has exactly one ordering
key. Kafka does not have this constraint; Kafka batches may contain
mixed-table rows because per-partition order is sufficient.

### Flush boundaries

`FlushAll` is called at every source-side **commit boundary** — a SQL
transaction in MySQL/Postgres/Oracle, a Log Sequence Number group in
SQL Server. This guarantees:

* A message never contains rows from two different transactions.
* Counters never include a partial transaction.
* Downstream consumers see committed state only.

## Retries

Wirekite delegates retry policy to the underlying SDK on each backend.
The Wirekite layer itself does **not** retry — it relies on the SDK's
internal retry budget and treats an exhausted-retries error as terminal.

| Backend                         | Retry layer                              | Retryable conditions                                   | Budget                                 |
| ------------------------------- | ---------------------------------------- | ------------------------------------------------------ | -------------------------------------- |
| Kafka / Redpanda (Go)           | `kafka-go` `Writer.MaxAttempts=10`       | Broker transient errors per the Kafka protocol         | 10 attempts                            |
| Kafka / Redpanda (SQL Server C) | `librdkafka` `message.timeout.ms=300000` | Same as above plus producer queue retry                | 300 s wall clock                       |
| Google Pub/Sub                  | `cloud.google.com/go/pubsub` + `gax`     | `Unavailable`, `DeadlineExceeded`, `ResourceExhausted` | Exponential backoff until SDK gives up |

Once the SDK signals a permanent failure, Wirekite's behavior is
**fail-fast**:

1. Increment the `errors` counter.
2. Log a `FATAL` line naming the failure.
3. Exit with code `1`.

The orchestrator detects the non-zero exit, flips the migration to
**Replication Failed**, and surfaces the failure in the UX. Recovery is
through **Resume from Failed**, which rewinds to the last persisted
progress-DB position and re-emits from there. This produces at-least-once
delivery semantics — a small window of records may be re-published after
a failure. Downstream consumers must therefore tolerate duplicates, or
rely on broker-side deduplication (Pub/Sub's
`enable-exactly-once-delivery` subscription flag).

<Note>
  The fail-fast policy is a deliberate design choice. CDC streams are
  strongly ordered; silently dropping or reordering a record corrupts the
  downstream replica. Loud failure plus replay from the last commit
  boundary is the only safe recovery shape.
</Note>

### Stall watchdog (Pub/Sub only)

The Google Pub/Sub publisher has an additional safety net for the case
where the SDK's per-message future never resolves — neither succeeding
nor failing. A watchdog ticks every 5 seconds and tracks whether the
`delivered` counter is advancing. If it has been frozen for 30 seconds
**while there are unresolved publishes**, the extractor logs a
`FATAL: QUEUE stall detected` line and exits. The orchestrator then
treats this the same way as any other publish failure (Replication
Failed → Resume from Failed). Kafka does not need an equivalent
watchdog — `librdkafka`'s `message.timeout.ms` already covers this
failure mode.

## Charts and operational signals

The Replication pane in the Wirekite UX shows a **Queue** subpane for
every migration that has a queue target. It surfaces three series on
one chart:

| Series          | Source                                      | Meaning                                                |
| --------------- | ------------------------------------------- | ------------------------------------------------------ |
| `Extracted/min` | `EXTRACTED.rows` field on the extractor log | `.ckt` lines accepted into the publisher's local queue |
| `Delivered/min` | `EXTRACTED.delivered` field                 | `.ckt` lines durably acknowledged by the broker        |
| `Errors`        | `EXTRACTED.queue_errors` field              | Cumulative publish errors (steady state = 0)           |

In a healthy migration `Delivered/min` tracks `Extracted/min` closely;
sustained divergence indicates broker back-pressure. `Errors > 0` is
always followed by extractor exit (fail-fast), so a non-zero point on
the errors series is the marker for a failure incident.

<Note>
  The publishers run in **async** mode on the Go side (`Async: true` for
  Kafka, completion-worker pool for Pub/Sub), so `submitted` (Extracted)
  and `delivered` are tracked separately. On the SQL Server C extractor
  both counters are wired through `librdkafka`'s delivery-report
  callback (Kafka) or google-cloud-cpp's per-publish future (Pub/Sub),
  producing the same semantics. The chart schema is identical across
  all four extractors.
</Note>

The shutdown log line names the lifetime totals:

```
QUEUE: shutdown (submitted=N, delivered=N, errors=0)
```

`submitted` and `delivered` should match exactly when a run completes
cleanly. A non-zero `errors` always co-occurs with a `FATAL` line
earlier in the log.

## Error conditions

| Symptom                                                               | Likely cause                                                                                  | Recovery                                                  |
| --------------------------------------------------------------------- | --------------------------------------------------------------------------------------------- | --------------------------------------------------------- |
| Extractor exits at startup with `cannot connect to broker`            | Bad `queueBrokers`; firewall; broker down                                                     | Verify broker reachability, fix config, restart           |
| Extractor exits at startup with `topic … does not exist`              | Topic not pre-created on the broker                                                           | Customer creates the topic, then resumes                  |
| Extractor exits at startup with `queueCompression: unsupported codec` | Typo in compression value                                                                     | Use one of `none`, `snappy`, `lz4`, `zstd`                |
| Extractor exits mid-run with `FATAL: QUEUE … failed`                  | SDK exhausted retries on a publish                                                            | Resume from Failed; investigate broker-side cause         |
| Extractor exits mid-run with `FATAL: QUEUE stall detected` (pubsub)   | Pub/Sub future never resolved                                                                 | Resume from Failed; check Pub/Sub quotas / IAM            |
| `Delivered/min` lags `Extracted/min` for sustained periods            | Broker back-pressure or network latency                                                       | Investigate broker capacity; consider raising `batchSize` |
| Consumer sees out-of-order rows for one table                         | Pub/Sub subscription created without `--enable-message-ordering`                              | Recreate the subscription — the flag is unflippable       |
| Consumer sees duplicate rows                                          | Resume from Failed replayed; or Pub/Sub subscription without `--enable-exactly-once-delivery` | Add consumer-side dedup, or recreate the subscription     |
| Queue target deletion leaves topic on the broker                      | Expected — Wirekite removes its reference only                                                | Delete the topic out-of-band if desired                   |

For backend-specific setup (broker bring-up, IAM, subscription flags),
continue to the next pages.
