KafkaΒΆ

When enabled will allow the consuming of messages from Kafka servers. Each message will be turned into a service request or a set of service requests.

It is possible to configure multiple consumers, topics and service calls per message.

Simply add multiple consumer sections to your KIT file.

1
2
3
4
5
[settings.exts."omni.services.transport.server.kafka".consumers]
topics = ["foo", "bar"]
bootstrap_servers = ["localhost:9092"]
group_id = "my_group"
service_calls = ["test/ping"]

By default the following request model will be passed through to the endpoints:

1
2
3
4
5
6
7
8
9
from pydantic import BaseModel, Field

class KafkaModel(BaseModel):
    topic: str = Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from")
    partition: int = Field(0, title="Kafka partition")
    offset: int = Field(0, title="Offset")
    key: str = Field("", title="Request key")
    value: dict = Field(..., title="Kafka value payload", description="Kafka value, holds the data")
    timestamp: str = Field("", title="Timestamp")

Which can then be used in the following way:

1
2
3
@router.post("/ping")
def ping(data: KafkaModel):
    print(data.value)

Note

Currently the Kafka consumer only supports POST requests.

It is possible to change the message processing by overriding the _process_message method on the KafkaConsumer which would allow to use a custom Pydantic data model. A custom implementation can be used by setting the exts."omni.services.transport.server.kafka".consumer_class class setting.