Skip to main content

Kafka Commercial

About Kafka

Apache Kafka is a distributed event streaming platform built for durable, high-throughput messaging. In Specmatic, Kafka support is driven by your AsyncAPI contract:

  • AsyncAPI describes the topics, headers, payloads, and examples.
  • Specmatic starts a Kafka mock that subscribes to request topics from that contract.
  • When an incoming message matches the contract, Specmatic can publish the reply event defined in the same contract.
  • A small HTTP API lets you snapshot message counts and verify what happened during the test run.

Defining Kafka In AsyncAPI

Declare a Kafka server and attach it to the channels that use it.

servers:
kafkaServer:
host: "localhost:9092"
protocol: "kafka"
description: "Kafka broker"

channels:
NewOrderPlaced:
address: "new-orders"
servers:
- $ref: "#/servers/kafkaServer"
messages:
placeOrder.message:
$ref: "#/components/messages/OrderRequest"

components:
messages:
OrderRequest:
headers:
type: object
properties:
orderCorrelationId:
type: string
payload:
type: object
properties:
orderId:
type: string

Notes:

  • protocol: kafka tells Specmatic to use Kafka transport.
  • address is the Kafka topic name.
  • Message headers and payload schema are validated against the contract.

Start The Kafka Mock

Create specmatic.yaml:

specmatic.yaml
version: 3
dependencies:
services:
- service:
$ref: "#/components/services/kafkaService"
runOptions:
$ref: "#/components/runOptions/kafkaServiceMock"
components:
sources:
yourContracts:
git:
url: https://your-central-contract-repo.com
services:
kafkaService:
description: Kafka AsyncAPI Service
definitions:
- definition:
source:
$ref: "#/components/sources/yourContracts"
specs:
- spec:
path: /path/to/your/kafka.yaml
runOptions:
kafkaServiceMock:
asyncapi:
type: mock
inMemoryBroker:
host: localhost
port: 9092
servers:
- host: localhost:9092
protocol: kafka

This config tells Specmatic where to load the AsyncAPI contract from and how to run the async mock.

Start the mock with the embedded in-memory Kafka broker:

docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise mock

Typical logs:

Setting up listeners
Listening on topics: (product-queries)
Starting API server on port: 9999

If you want Specmatic to use an existing Kafka broker instead of the embedded one:

docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise --external-broker-url localhost:9092 mock

If you want the HTTP API on a different port, set API_SERVER_PORT:

docker run --network host \
-e API_SERVER_PORT=3000 \
-v "$(pwd):/usr/src/app" \
specmatic/enterprise mock

--network host is important when using localhost for the broker address because it allows the container to reach services on the host directly.

API Server

The Kafka mock exposes a small HTTP API on http://localhost:9999.

Typical flow:

  1. Start the mock.
  2. Capture a snapshot.
  3. Exercise your producer or service.
  4. Verify the channels or examples you care about.
  5. Stop the mock.

OpenAPI summary:

openapi: 3.0.0
info:
title: Specmatic Kafka Mock Server API
version: 1.0.0
servers:
- url: http://localhost:9999
paths:
/_specmatic/snapshot:
post:
summary: Captures the current message-count snapshot for all known channels.
responses:
'200':
description: Snapshot captured successfully
/_specmatic/verify:
get:
summary: Verifies counts for the requested channels or example ids.
parameters:
- in: query
name: channels
required: false
schema:
type: string
- in: query
name: exampleIds
required: false
schema:
type: string
responses:
'200':
description: Verification counts
content:
application/json:
schema:
type: object
additionalProperties:
type: integer
'400':
description: Invalid query parameter usage
'404':
description: No active snapshot exists for the requested channel
/_specmatic/stop:
post:
summary: Dumps reports and stops the async mock.
responses:
'200':
description: Stop initiated

Capture A Snapshot

curl -X POST http://localhost:9999/_specmatic/snapshot

This records the current count for all known channels. Verification is always done against messages received after the latest snapshot.

Verify By Channel

curl "http://localhost:9999/_specmatic/verify?channels=place-order,order-confirmed"

Example response:

{
"place-order": 1,
"order-confirmed": 1
}

Verify By Example Id

curl "http://localhost:9999/_specmatic/verify?exampleIds=success-case,invalid-order"

Example response:

{
"success-case": 2,
"invalid-order": 0
}

Stop The Mock

curl -X POST http://localhost:9999/_specmatic/stop

There is also a legacy endpoint:

curl -X POST http://localhost:9999/stop

/stop is deprecated and scheduled for removal on 2026-07-01. Use /_specmatic/stop.

API Rules

  • /_specmatic/verify accepts exactly one query parameter.
  • Supported query parameters are channels and exampleIds.
  • Values are comma-separated.
  • Unknown query parameter names return 400 Bad Request.
  • Verifying a channel before a snapshot exists for it returns 404 Not Found.

A Simple Test Flow

# 1. Start the mock
docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise mock

# 2. Capture the snapshot
curl -X POST http://localhost:9999/_specmatic/snapshot

# 3. Exercise your app or publish to Kafka

# 4. Verify the channels that should have received messages
curl "http://localhost:9999/_specmatic/verify?channels=place-order,order-confirmed"

# 5. Stop the mock
curl -X POST http://localhost:9999/_specmatic/stop