Kafka Commercial
About Kafka
Apache Kafka is a distributed event streaming platform built for durable, high-throughput messaging. In Specmatic, Kafka support is driven by your AsyncAPI contract:
- AsyncAPI describes the topics, headers, payloads, and examples.
- Specmatic starts a Kafka mock that subscribes to request topics from that contract.
- When an incoming message matches the contract, Specmatic can publish the reply event defined in the same contract.
- A small HTTP API lets you snapshot message counts and verify what happened during the test run.
Defining Kafka In AsyncAPI
Declare a Kafka server and attach it to the channels that use it.
servers:
kafkaServer:
host: "localhost:9092"
protocol: "kafka"
description: "Kafka broker"
channels:
NewOrderPlaced:
address: "new-orders"
servers:
- $ref: "#/servers/kafkaServer"
messages:
placeOrder.message:
$ref: "#/components/messages/OrderRequest"
components:
messages:
OrderRequest:
headers:
type: object
properties:
orderCorrelationId:
type: string
payload:
type: object
properties:
orderId:
type: string
Notes:
protocol: kafkatells Specmatic to use Kafka transport.addressis the Kafka topic name.- Message headers and payload schema are validated against the contract.
Start The Kafka Mock
Create specmatic.yaml:
- YAML
- JSON
version: 3
dependencies:
services:
- service:
$ref: "#/components/services/kafkaService"
runOptions:
$ref: "#/components/runOptions/kafkaServiceMock"
components:
sources:
yourContracts:
git:
url: https://your-central-contract-repo.com
services:
kafkaService:
description: Kafka AsyncAPI Service
definitions:
- definition:
source:
$ref: "#/components/sources/yourContracts"
specs:
- spec:
path: /path/to/your/kafka.yaml
runOptions:
kafkaServiceMock:
asyncapi:
type: mock
inMemoryBroker:
host: localhost
port: 9092
servers:
- host: localhost:9092
protocol: kafka
{
"version": 3,
"dependencies": {
"services": [
{
"service": {
"$ref": "#/components/services/kafkaService",
"runOptions": {
"$ref": "#/components/runOptions/kafkaServiceMock"
}
}
}
]
},
"components": {
"sources": {
"yourContracts": {
"git": {
"url": "https://your-central-contract-repo.com"
}
}
},
"services": {
"kafkaService": {
"description": "Kafka AsyncAPI Service",
"definitions": [
{
"definition": {
"source": {
"$ref": "#/components/sources/yourContracts"
},
"specs": [
{
"spec": {
"path": "/path/to/your/kafka.yaml"
}
}
]
}
}
]
}
},
"runOptions": {
"kafkaServiceMock": {
"asyncapi": {
"type": "mock",
"inMemoryBroker": {
"host": "localhost",
"port": 9092
},
"servers": [
{
"host": "localhost:9092",
"protocol": "kafka"
}
]
}
}
}
}
}
This config tells Specmatic where to load the AsyncAPI contract from and how to run the async mock.
For the full AsyncAPI configuration fields used here, see Test Configuration and Mock Configuration.
Start the mock with the embedded in-memory Kafka broker:
docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise mock
Typical logs:
Setting up listeners
Listening on topics: (product-queries)
Starting API server on port: 9999
If you want Specmatic to use an existing Kafka broker instead of the embedded one:
docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise --external-broker-url localhost:9092 mock
If you want the HTTP API on a different port, set API_SERVER_PORT:
docker run --network host \
-e API_SERVER_PORT=3000 \
-v "$(pwd):/usr/src/app" \
specmatic/enterprise mock
--network host is important when using localhost for the broker address because it allows the container to reach services on the host directly.
Kafka Configuration In specmatic.yaml
For Kafka, the servers entries in runOptions.asyncapi map directly to the async config classes:
servers[].hostandservers[].protocolselect the brokerservers[].adminCredentialscarries Kafka admin client propertiesservers[].client.producerandservers[].client.consumercarry Kafka client propertiesschemaRegistryis used when your AsyncAPI payloads reference Avro schemas from a registry
Example:
- YAML
- JSON
version: 3
systemUnderTest:
service:
definitions:
- definition:
source:
filesystem:
directory: api-specs
specs:
- order-service-async-avro-v3_0_0.yaml
runOptions:
asyncapi:
type: test
replyTimeout: 15000
subscriberReadinessWaitTime: 3000
schemaRegistry:
kind: CONFLUENT
url: ${SCHEMA_REGISTRY_URL:http://localhost:8085}
username: admin
password: admin-secret
servers:
- host: ${KAFKA_BROKER:localhost:9092}
protocol: kafka
adminCredentials:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test-secret";
client:
producer:
basic.auth.credentials.source: USER_INFO
basic.auth.user.info: admin:admin-secret
consumer:
basic.auth.credentials.source: USER_INFO
basic.auth.user.info: admin:admin-secret
{
"version": 3,
"systemUnderTest": {
"service": {
"definitions": [
{
"definition": {
"source": {
"filesystem": {
"directory": "api-specs"
}
},
"specs": [
"order-service-async-avro-v3_0_0.yaml"
]
}
}
],
"runOptions": {
"asyncapi": {
"type": "test",
"replyTimeout": 15000,
"subscriberReadinessWaitTime": 3000,
"schemaRegistry": {
"kind": "CONFLUENT",
"url": "${SCHEMA_REGISTRY_URL:http://localhost:8085}",
"username": "admin",
"password": "admin-secret"
},
"servers": [
{
"host": "${KAFKA_BROKER:localhost:9092}",
"protocol": "kafka",
"adminCredentials": {
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "PLAIN",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"test\" password=\"test-secret\";"
},
"client": {
"producer": {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "admin:admin-secret"
},
"consumer": {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "admin:admin-secret"
}
}
}
]
}
}
}
}
}
Notes:
- Use
adminCredentialsfor broker-level settings such assecurity.protocol,sasl.mechanism, andsasl.jaas.config. - Use
client.producerandclient.consumerfor producer-specific and consumer-specific properties. - Add
schemaRegistryonly when the AsyncAPI contract uses registry-backed schemas such as Avro. replyTimeoutandsubscriberReadinessWaitTimebelong in async test configuration, not mock-only configuration.
API Server
The Kafka mock exposes a small HTTP API on http://localhost:9999.
Typical flow:
- Start the mock.
- Capture a snapshot.
- Exercise your producer or service.
- Verify the channels or examples you care about.
- Stop the mock.
OpenAPI summary:
openapi: 3.0.0
info:
title: Specmatic Kafka Mock Server API
version: 1.0.0
servers:
- url: http://localhost:9999
paths:
/_specmatic/snapshot:
post:
summary: Captures the current message-count snapshot for all known channels.
responses:
'200':
description: Snapshot captured successfully
/_specmatic/verify:
get:
summary: Verifies counts for the requested channels or example ids.
parameters:
- in: query
name: channels
required: false
schema:
type: string
- in: query
name: exampleIds
required: false
schema:
type: string
responses:
'200':
description: Verification counts
content:
application/json:
schema:
type: object
additionalProperties:
type: integer
'400':
description: Invalid query parameter usage
'404':
description: No active snapshot exists for the requested channel
/_specmatic/stop:
post:
summary: Dumps reports and stops the async mock.
responses:
'200':
description: Stop initiated
Capture A Snapshot
curl -X POST http://localhost:9999/_specmatic/snapshot
This records the current count for all known channels. Verification is always done against messages received after the latest snapshot.
Verify By Channel
curl "http://localhost:9999/_specmatic/verify?channels=place-order,order-confirmed"
Example response:
{
"place-order": 1,
"order-confirmed": 1
}
Verify By Example Id
curl "http://localhost:9999/_specmatic/verify?exampleIds=success-case,invalid-order"
Example response:
{
"success-case": 2,
"invalid-order": 0
}
Stop The Mock
curl -X POST http://localhost:9999/_specmatic/stop
There is also a legacy endpoint:
curl -X POST http://localhost:9999/stop
/stop is deprecated and scheduled for removal on 2026-07-01. Use /_specmatic/stop.
API Rules
/_specmatic/verifyaccepts exactly one query parameter.- Supported query parameters are
channelsandexampleIds. - Values are comma-separated.
- Unknown query parameter names return
400 Bad Request. - Verifying a channel before a snapshot exists for it returns
404 Not Found.
A Simple Test Flow
# 1. Start the mock
docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise mock
# 2. Capture the snapshot
curl -X POST http://localhost:9999/_specmatic/snapshot
# 3. Exercise your app or publish to Kafka
# 4. Verify the channels that should have received messages
curl "http://localhost:9999/_specmatic/verify?channels=place-order,order-confirmed"
# 5. Stop the mock
curl -X POST http://localhost:9999/_specmatic/stop