Skip to main content

Kafka Commercial

About Kafka

Apache Kafka is a distributed event streaming platform built for durable, high-throughput messaging. In Specmatic, Kafka support is driven by your AsyncAPI contract:

  • AsyncAPI describes the topics, headers, payloads, and examples.
  • Specmatic starts a Kafka mock that subscribes to request topics from that contract.
  • When an incoming message matches the contract, Specmatic can publish the reply event defined in the same contract.
  • A small HTTP API lets you snapshot message counts and verify what happened during the test run.

Defining Kafka In AsyncAPI

Declare a Kafka server and attach it to the channels that use it.

servers:
kafkaServer:
host: "localhost:9092"
protocol: "kafka"
description: "Kafka broker"

channels:
NewOrderPlaced:
address: "new-orders"
servers:
- $ref: "#/servers/kafkaServer"
messages:
placeOrder.message:
$ref: "#/components/messages/OrderRequest"

components:
messages:
OrderRequest:
headers:
type: object
properties:
orderCorrelationId:
type: string
payload:
type: object
properties:
orderId:
type: string

Notes:

  • protocol: kafka tells Specmatic to use Kafka transport.
  • address is the Kafka topic name.
  • Message headers and payload schema are validated against the contract.

Start The Kafka Mock

Create specmatic.yaml:

specmatic.yaml
version: 3
dependencies:
services:
- service:
$ref: "#/components/services/kafkaService"
runOptions:
$ref: "#/components/runOptions/kafkaServiceMock"
components:
sources:
yourContracts:
git:
url: https://your-central-contract-repo.com
services:
kafkaService:
description: Kafka AsyncAPI Service
definitions:
- definition:
source:
$ref: "#/components/sources/yourContracts"
specs:
- spec:
path: /path/to/your/kafka.yaml
runOptions:
kafkaServiceMock:
asyncapi:
type: mock
inMemoryBroker:
host: localhost
port: 9092
servers:
- host: localhost:9092
protocol: kafka

This config tells Specmatic where to load the AsyncAPI contract from and how to run the async mock.

For the full AsyncAPI configuration fields used here, see Test Configuration and Mock Configuration.

Start the mock with the embedded in-memory Kafka broker:

docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise mock

Typical logs:

Setting up listeners
Listening on topics: (product-queries)
Starting API server on port: 9999

If you want Specmatic to use an existing Kafka broker instead of the embedded one:

docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise --external-broker-url localhost:9092 mock

If you want the HTTP API on a different port, set API_SERVER_PORT:

docker run --network host \
-e API_SERVER_PORT=3000 \
-v "$(pwd):/usr/src/app" \
specmatic/enterprise mock

--network host is important when using localhost for the broker address because it allows the container to reach services on the host directly.

Kafka Configuration In specmatic.yaml

For Kafka, the servers entries in runOptions.asyncapi map directly to the async config classes:

  • servers[].host and servers[].protocol select the broker
  • servers[].adminCredentials carries Kafka admin client properties
  • servers[].client.producer and servers[].client.consumer carry Kafka client properties
  • schemaRegistry is used when your AsyncAPI payloads reference Avro schemas from a registry

Example:

specmatic.yaml
version: 3
systemUnderTest:
service:
definitions:
- definition:
source:
filesystem:
directory: api-specs
specs:
- order-service-async-avro-v3_0_0.yaml
runOptions:
asyncapi:
type: test
replyTimeout: 15000
subscriberReadinessWaitTime: 3000
schemaRegistry:
kind: CONFLUENT
url: ${SCHEMA_REGISTRY_URL:http://localhost:8085}
username: admin
password: admin-secret
servers:
- host: ${KAFKA_BROKER:localhost:9092}
protocol: kafka
adminCredentials:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test-secret";
client:
producer:
basic.auth.credentials.source: USER_INFO
basic.auth.user.info: admin:admin-secret
consumer:
basic.auth.credentials.source: USER_INFO
basic.auth.user.info: admin:admin-secret

Notes:

  • Use adminCredentials for broker-level settings such as security.protocol, sasl.mechanism, and sasl.jaas.config.
  • Use client.producer and client.consumer for producer-specific and consumer-specific properties.
  • Add schemaRegistry only when the AsyncAPI contract uses registry-backed schemas such as Avro.
  • replyTimeout and subscriberReadinessWaitTime belong in async test configuration, not mock-only configuration.

API Server

The Kafka mock exposes a small HTTP API on http://localhost:9999.

Typical flow:

  1. Start the mock.
  2. Capture a snapshot.
  3. Exercise your producer or service.
  4. Verify the channels or examples you care about.
  5. Stop the mock.

OpenAPI summary:

openapi: 3.0.0
info:
title: Specmatic Kafka Mock Server API
version: 1.0.0
servers:
- url: http://localhost:9999
paths:
/_specmatic/snapshot:
post:
summary: Captures the current message-count snapshot for all known channels.
responses:
'200':
description: Snapshot captured successfully
/_specmatic/verify:
get:
summary: Verifies counts for the requested channels or example ids.
parameters:
- in: query
name: channels
required: false
schema:
type: string
- in: query
name: exampleIds
required: false
schema:
type: string
responses:
'200':
description: Verification counts
content:
application/json:
schema:
type: object
additionalProperties:
type: integer
'400':
description: Invalid query parameter usage
'404':
description: No active snapshot exists for the requested channel
/_specmatic/stop:
post:
summary: Dumps reports and stops the async mock.
responses:
'200':
description: Stop initiated

Capture A Snapshot

curl -X POST http://localhost:9999/_specmatic/snapshot

This records the current count for all known channels. Verification is always done against messages received after the latest snapshot.

Verify By Channel

curl "http://localhost:9999/_specmatic/verify?channels=place-order,order-confirmed"

Example response:

{
"place-order": 1,
"order-confirmed": 1
}

Verify By Example Id

curl "http://localhost:9999/_specmatic/verify?exampleIds=success-case,invalid-order"

Example response:

{
"success-case": 2,
"invalid-order": 0
}

Stop The Mock

curl -X POST http://localhost:9999/_specmatic/stop

There is also a legacy endpoint:

curl -X POST http://localhost:9999/stop

/stop is deprecated and scheduled for removal on 2026-07-01. Use /_specmatic/stop.

API Rules

  • /_specmatic/verify accepts exactly one query parameter.
  • Supported query parameters are channels and exampleIds.
  • Values are comma-separated.
  • Unknown query parameter names return 400 Bad Request.
  • Verifying a channel before a snapshot exists for it returns 404 Not Found.

A Simple Test Flow

# 1. Start the mock
docker run --network host -v "$(pwd):/usr/src/app" specmatic/enterprise mock

# 2. Capture the snapshot
curl -X POST http://localhost:9999/_specmatic/snapshot

# 3. Exercise your app or publish to Kafka

# 4. Verify the channels that should have received messages
curl "http://localhost:9999/_specmatic/verify?channels=place-order,order-confirmed"

# 5. Stop the mock
curl -X POST http://localhost:9999/_specmatic/stop