KafkaΒΆ
When enabled will allow the consuming of messages from Kafka servers. Each message will be turned into a service request or a set of service requests.
It is possible to configure multiple consumers, topics and service calls per message.
Simply add multiple consumer sections to your KIT file.
1 2 3 4 5 | [settings.exts."omni.services.transport.server.kafka".consumers]
topics = ["foo", "bar"]
bootstrap_servers = ["localhost:9092"]
group_id = "my_group"
service_calls = ["test/ping"]
|
By default the following request model will be passed through to the endpoints:
1 2 3 4 5 6 7 8 9 | from pydantic import BaseModel, Field
class KafkaModel(BaseModel):
topic: str = Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from")
partition: int = Field(0, title="Kafka partition")
offset: int = Field(0, title="Offset")
key: str = Field("", title="Request key")
value: dict = Field(..., title="Kafka value payload", description="Kafka value, holds the data")
timestamp: str = Field("", title="Timestamp")
|
Which can then be used in the following way:
1 2 3 | @router.post("/ping")
def ping(data: KafkaModel):
print(data.value)
|
Note
Currently the Kafka consumer only supports POST requests.
It is possible to change the message processing by overriding the
_process_message
method on the KafkaConsumer which would allow to
use a custom Pydantic data model. A custom implementation can be used by
setting the exts."omni.services.transport.server.kafka".consumer_class
class setting.