Kafka#

When enabled will allow the consuming of messages from Kafka servers. Each message will be turned into a service request or a set of service requests.

It is possible to configure multiple consumers, topics and service calls per message.

Simply add multiple consumer sections to your KIT file.

1[settings.exts."omni.services.transport.server.kafka".consumers]
2topics = ["foo", "bar"]
3bootstrap_servers = ["localhost:9092"]
4group_id = "my_group"
5service_calls = ["test/ping"]

By default the following request model will be passed through to the endpoints:

1from pydantic import BaseModel, Field
2
3class KafkaModel(BaseModel):
4    topic: str = Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from")
5    partition: int = Field(0, title="Kafka partition")
6    offset: int = Field(0, title="Offset")
7    key: str = Field("", title="Request key")
8    value: dict = Field(..., title="Kafka value payload", description="Kafka value, holds the data")
9    timestamp: str = Field("", title="Timestamp")

Which can then be used in the following way:

1@router.post("/ping")
2def ping(data: KafkaModel):
3    print(data.value)

Note

Currently the Kafka consumer only supports POST requests.

It is possible to change the message processing by overriding the _process_message method on the KafkaConsumer which would allow to use a custom Pydantic data model. A custom implementation can be used by setting the exts."omni.services.transport.server.kafka".consumer_class class setting.