Kafka#
When enabled will allow the consuming of messages from Kafka servers. Each message will be turned into a service request or a set of service requests.
It is possible to configure multiple consumers, topics and service calls per message.
Simply add multiple consumer sections to your KIT file.
1[settings.exts."omni.services.transport.server.kafka".consumers]
2topics = ["foo", "bar"]
3bootstrap_servers = ["localhost:9092"]
4group_id = "my_group"
5service_calls = ["test/ping"]
By default the following request model will be passed through to the endpoints:
1from pydantic import BaseModel, Field
2
3class KafkaModel(BaseModel):
4 topic: str = Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from")
5 partition: int = Field(0, title="Kafka partition")
6 offset: int = Field(0, title="Offset")
7 key: str = Field("", title="Request key")
8 value: dict = Field(..., title="Kafka value payload", description="Kafka value, holds the data")
9 timestamp: str = Field("", title="Timestamp")
Which can then be used in the following way:
1@router.post("/ping")
2def ping(data: KafkaModel):
3 print(data.value)
Note
Currently the Kafka consumer only supports POST requests.
It is possible to change the message processing by overriding the
_process_message
method on the KafkaConsumer which would allow to
use a custom Pydantic data model. A custom implementation can be used by
setting the exts."omni.services.transport.server.kafka".consumer_class
class setting.