===== Kafka ===== When enabled will allow the consuming of messages from Kafka servers. Each message will be turned into a service request or a set of service requests. It is possible to configure multiple consumers, topics and service calls per message. Simply add multiple consumer sections to your KIT file. .. code-block:: toml :linenos: [settings.exts."omni.services.transport.server.kafka".consumers] topics = ["foo", "bar"] bootstrap_servers = ["localhost:9092"] group_id = "my_group" service_calls = ["test/ping"] By default the following request model will be passed through to the endpoints: .. code-block:: python :linenos: from pydantic import BaseModel, Field class KafkaModel(BaseModel): topic: str = Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from") partition: int = Field(0, title="Kafka partition") offset: int = Field(0, title="Offset") key: str = Field("", title="Request key") value: dict = Field(..., title="Kafka value payload", description="Kafka value, holds the data") timestamp: str = Field("", title="Timestamp") Which can then be used in the following way: .. code-block:: python :linenos: @router.post("/ping") def ping(data: KafkaModel): print(data.value) .. note:: Currently the Kafka consumer only supports POST requests. It is possible to change the message processing by overriding the ``_process_message`` method on the KafkaConsumer which would allow to use a custom Pydantic data model. A custom implementation can be used by setting the ``exts."omni.services.transport.server.kafka".consumer_class`` class setting.