SimpleSpec¶
- pydantic model asyncapi_container.containers.v3.simple_spec.SimpleSpecV3¶
Simplified version of AsyncAPI specification mapper. Doesn’t require to understand and use the whole asyncapi specification allowing focus only on producers and consumers.
- Config:
arbitrary_types_allowed: bool = True
- Fields:
- field info: Info = Info(title='My Project', version='0.0.1', description='')¶
- field receives: Dict[str | BaseModel, List[Type[BaseModel]]] = {}¶
What your service can receive. Acting as consumer.
- field sends: Dict[str | BaseModel, List[Type[BaseModel]]] = {}¶
What your service can send. Acting as producer.
- class Config¶
Define pydantic model and include it inside a list sends
or receives
from asyncapi_container.asyncapi.spec.v3.info import Info
from asyncapi_container.containers.v3.simple_spec import SimpleSpecV3
from asyncapi_container.custom_types import RoutingMap
from pydantic import BaseModel, Field
class Customer(BaseModel):
first_name: str = Field(..., title='First Name')
last_name: str = Field(..., title='Last Name')
email: str = Field(..., title='Email')
country: str = Field(..., title='Country')
zipcode: str = Field(..., title='Zipcode')
city: str = Field(..., title='City')
street: str = Field(..., title='Street')
apartment: str = Field(..., title='Apartment')
class OrderSchemaV1(BaseModel):
product_id: int = Field(..., title='Product Id')
quantity: int = Field(..., title='Quantity')
customer: Customer
class MySpecialServiceAsyncAPISpecV3(SimpleSpecV3):
info: Info = Info(
title="My special Service",
version="1.0.0",
description="Service for making orders"
)
sends: RoutingMap = {
"shop.orders.v1": [
OrderSchemaV1,
]
}
receives: RoutingMap = {}
Result: