Kafka Commercial
About Kafka
Apache Kafka is a distributed event streaming platform built for durable, high-throughput messaging. In Specmatic, Kafka support is driven by your AsyncAPI contract:
- AsyncAPI describes the topics, headers, payloads, and examples.
- Specmatic starts a Kafka mock that subscribes to request topics from that contract.
- When an incoming message matches the contract, Specmatic can publish the reply event defined in the same contract.
- A small HTTP API lets you snapshot message counts and verify what happened during the test run.
Defining Kafka In AsyncAPI
Declare a Kafka server and attach it to the channels that use it.
servers:
kafkaServer:
host: "localhost:9092"
protocol: "kafka"
description: "Kafka broker"
channels:
NewOrderPlaced:
address: "new-orders"
servers:
- $ref: "#/servers/kafkaServer"
messages:
placeOrder.message:
$ref: "#/components/messages/OrderRequest"
components:
messages:
OrderRequest:
headers:
type: object
properties:
orderCorrelationId:
type: string
payload:
type: object
properties:
orderId:
type: string
Notes:
protocol: kafkatells Specmatic to use Kafka transport.addressis the Kafka topic name.- Message headers and payload schema are validated against the contract.
Start The Kafka Mock
Create specmatic.yaml:
- YAML
- JSON
version: 3
dependencies:
services:
- service:
$ref: "#/components/services/kafkaService"
runOptions:
$ref: "#/components/runOptions/kafkaServiceMock"
components:
sources:
yourContracts:
git:
url: https://your-central-contract-repo.com
services:
kafkaService:
description: Kafka AsyncAPI Service
definitions:
- definition:
source:
$ref: "#/components/sources/yourContracts"
specs:
- spec:
path: /path/to/your/kafka.yaml
runOptions:
kafkaServiceMock:
asyncapi:
type: mock
inMemoryBroker:
host: localhost
port: 9092
servers:
- host: localhost:9092
protocol: kafka
{
"version": 3,
"dependencies": {
"services": [
{
"service": {
"$ref": "#/components/services/kafkaService",
"runOptions": {
"$ref": "#/components/runOptions/kafkaServiceMock"
}
}
}
]
},
"components": {
"sources": {
"yourContracts": {
"git": {
"url": "https://your-central-contract-repo.com"
}
}
},
"services": {
"kafkaService": {
"description": "Kafka AsyncAPI Service",
"definitions": [
{
"definition": {
"source": {
"$ref": "#/components/sources/yourContracts"
},
"specs": [
{
"spec": {
"path": "/path/to/your/kafka.yaml"
}
}
]
}
}
]
}
},
"runOptions": {
"kafkaServiceMock": {
"asyncapi": {
"type": "mock",
"inMemoryBroker": {
"host": "localhost",
"port": 9092
},
"servers": [
{
"host": "localhost:9092",
"protocol": "kafka"
}
]
}
}
}
}
}
This config tells Specmatic where to load the AsyncAPI contract from and how to run the async mock.
Start the mock with the embedded in-memory Kafka broker:
docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise mock
Typical logs:
Setting up listeners
Listening on topics: (product-queries)
Starting API server on port: 9999
If you want Specmatic to use an existing Kafka broker instead of the embedded one:
docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise --external-broker-url localhost:9092 mock
If you want the HTTP API on a different port, set API_SERVER_PORT:
docker run --network host \
-e API_SERVER_PORT=3000 \
-v "$(pwd):/usr/src/app" \
specmatic/enterprise mock
--network host is important when using localhost for the broker address because it allows the container to reach services on the host directly.
API Server
The Kafka mock exposes a small HTTP API on http://localhost:9999.
Typical flow:
- Start the mock.
- Capture a snapshot.
- Exercise your producer or service.
- Verify the channels or examples you care about.
- Stop the mock.
OpenAPI summary:
openapi: 3.0.0
info:
title: Specmatic Kafka Mock Server API
version: 1.0.0
servers:
- url: http://localhost:9999
paths:
/_specmatic/snapshot:
post:
summary: Captures the current message-count snapshot for all known channels.
responses:
'200':
description: Snapshot captured successfully
/_specmatic/verify:
get:
summary: Verifies counts for the requested channels or example ids.
parameters:
- in: query
name: channels
required: false
schema:
type: string
- in: query
name: exampleIds
required: false
schema:
type: string
responses:
'200':
description: Verification counts
content:
application/json:
schema:
type: object
additionalProperties:
type: integer
'400':
description: Invalid query parameter usage
'404':
description: No active snapshot exists for the requested channel
/_specmatic/stop:
post:
summary: Dumps reports and stops the async mock.
responses:
'200':
description: Stop initiated
Capture A Snapshot
curl -X POST http://localhost:9999/_specmatic/snapshot
This records the current count for all known channels. Verification is always done against messages received after the latest snapshot.
Verify By Channel
curl "http://localhost:9999/_specmatic/verify?channels=place-order,order-confirmed"
Example response:
{
"place-order": 1,
"order-confirmed": 1
}
Verify By Example Id
curl "http://localhost:9999/_specmatic/verify?exampleIds=success-case,invalid-order"
Example response:
{
"success-case": 2,
"invalid-order": 0
}
Stop The Mock
curl -X POST http://localhost:9999/_specmatic/stop
There is also a legacy endpoint:
curl -X POST http://localhost:9999/stop
/stop is deprecated and scheduled for removal on 2026-07-01. Use /_specmatic/stop.
API Rules
/_specmatic/verifyaccepts exactly one query parameter.- Supported query parameters are
channelsandexampleIds. - Values are comma-separated.
- Unknown query parameter names return
400 Bad Request. - Verifying a channel before a snapshot exists for it returns
404 Not Found.
A Simple Test Flow
# 1. Start the mock
docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise mock
# 2. Capture the snapshot
curl -X POST http://localhost:9999/_specmatic/snapshot
# 3. Exercise your app or publish to Kafka
# 4. Verify the channels that should have received messages
curl "http://localhost:9999/_specmatic/verify?channels=place-order,order-confirmed"
# 5. Stop the mock
curl -X POST http://localhost:9999/_specmatic/stop