Kafka (KRaft)¶
Since v0.24.0
Introduction¶
The Testcontainers module for KRaft: Apache Kafka Without ZooKeeper.
Adding this module to your project dependencies¶
Please run the following command to add the Kafka module to your Go dependencies:
go get github.com/testcontainers/testcontainers-go/modules/kafka
Usage example¶
ctx := context.Background()
kafkaContainer, err := kafka.Run(ctx,
    "confluentinc/confluent-local:7.5.0",
    kafka.WithClusterID("test-cluster"),
)
defer func() {
    if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
        log.Printf("failed to terminate container: %s", err)
    }
}()
if err != nil {
    log.Printf("failed to start container: %s", err)
    return
}
Module Reference¶
Run function¶
- Since v0.32.0
Info
The RunContainer(ctx, opts...) function is deprecated and will be removed in the next major release of Testcontainers for Go.
The Kafka module exposes one entrypoint function to create the Kafka container, and this function receives three parameters:
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error)
- context.Context, the Go context.
- string, the Docker image to use.
- testcontainers.ContainerCustomizer, a variadic argument for passing options.
Image¶
Use the second argument in the Run function to set a valid Docker image.
In example: Run(context.Background(), "confluentinc/confluent-local:7.5.0").
Warning
The minimal required version of Kafka for KRaft mode is confluentinc/confluent-local:7.4.0. If you are using an image that
is different from the official one, please make sure that it's compatible with KRaft mode, as the module won't check
the version for you.
Environment variables¶
The environment variables that are already set by default are:
"KAFKA_LISTENERS":                                "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_REST_BOOTSTRAP_SERVERS":                   "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP":           "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME":               "BROKER",
"KAFKA_BROKER_ID":                                "1",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR":         "1",
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS":             "1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1",
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR":            "1",
"KAFKA_LOG_FLUSH_INTERVAL_MESSAGES":              strconv.Itoa(math.MaxInt),
"KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS":         "0",
"KAFKA_NODE_ID":                                  "1",
"KAFKA_PROCESS_ROLES":                            "broker,controller",
"KAFKA_CONTROLLER_LISTENER_NAMES":                "CONTROLLER",
Init script¶
The Kafka container will be started using a custom shell script:
    starterScriptContent = `#!/bin/bash
source /etc/confluent/docker/bash-config
export KAFKA_ADVERTISED_LISTENERS=%s,BROKER://%s:9092
echo Starting Kafka KRaft mode
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
echo '' > /etc/confluent/docker/ensure
/etc/confluent/docker/configure
/etc/confluent/docker/launch`
⋯
        return fmt.Errorf("copy to container: %w", err)
Container Options¶
When starting the Kafka container, you can pass options in a variadic way to configure it.
The following options are exposed by the testcontainers package.
Basic Options¶
- WithExposedPortsSince v0.37.0
- WithEnvSince v0.29.0
- WithWaitStrategySince v0.20.0
- WithAdditionalWaitStrategySince v0.38.0
- WithWaitStrategyAndDeadlineSince v0.20.0
- WithAdditionalWaitStrategyAndDeadlineSince v0.38.0
- WithEntrypointSince v0.37.0
- WithEntrypointArgsSince v0.37.0
- WithCmdSince v0.37.0
- WithCmdArgsSince v0.37.0
- WithLabelsSince v0.37.0
Lifecycle Options¶
- WithLifecycleHooksSince v0.38.0
- WithAdditionalLifecycleHooksSince v0.38.0
- WithStartupCommandSince v0.25.0
- WithAfterReadyCommandSince v0.28.0
Files & Mounts Options¶
- WithFilesSince v0.37.0
- WithMountsSince v0.37.0
- WithTmpfsSince v0.37.0
- WithImageMountSince v0.37.0
Build Options¶
- WithDockerfileSince v0.37.0
Logging Options¶
- WithLogConsumersSince v0.28.0
- WithLogConsumerConfigSince v0.38.0
- WithLoggerSince v0.29.0
Image Options¶
- WithAlwaysPullSince v0.38.0
- WithImageSubstitutorsSince v0.26.0
- WithImagePlatformSince v0.38.0
Networking Options¶
- WithNetworkSince v0.27.0
- WithNetworkByNameSince v0.38.0
- WithBridgeNetworkSince v0.38.0
- WithNewNetworkSince v0.27.0
Advanced Options¶
- WithHostPortAccessSince v0.31.0
- WithConfigModifierSince v0.20.0
- WithHostConfigModifierSince v0.20.0
- WithEndpointSettingsModifierSince v0.20.0
- CustomizeRequestSince v0.20.0
- WithNameSince v0.38.0
- WithNoStartSince v0.38.0
- WithProviderNot available until the next release main
Experimental Options¶
- WithReuseByNameSince v0.37.0
Container Methods¶
The Kafka container exposes the following methods:
Brokers¶
- Since v0.24.0
The Brokers(ctx) method returns the Kafka brokers as a string slice, containing the host and the random port defined by Kafka's public port (9093/tcp).
brokers, err := kafkaContainer.Brokers(ctx)