from __future__ import annotations
from typing import Generator, Optional, Dict, Any, Union, List
import random
import asyncio
import base64
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from ..typing import AsyncResult, Messages
from ..requests import StreamSession, raise_for_status
from ..errors import ResponseError
from ..image import ImageResponse
class ReplicateHome(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://replicate.com"
parent = "Replicate"
working = True
default_model = 'stability-ai/sdxl'
models = [
# image
'stability-ai/sdxl',
'ai-forever/kandinsky-2.2',
# text
'meta/llama-2-70b-chat',
'mistralai/mistral-7b-instruct-v0.2'
]
versions = {
# image
'stability-ai/sdxl': [
"39ed52f2a78e934b3ba6e2a89f5b1c712de7dfea535525255b1aa35c5565e08b",
"2b017d9b67edd2ee1401238df49d75da53c523f36e363881e057f5dc3ed3c5b2",
"7762fd07cf82c948538e41f63f77d685e02b063e37e496e96eefd46c929f9bdc"
],
'ai-forever/kandinsky-2.2': [
"ad9d7879fbffa2874e1d909d1d37d9bc682889cc65b31f7bb00d2362619f194a"
],
# Text
'meta/llama-2-70b-chat': [
"dp-542693885b1777c98ef8c5a98f2005e7"
],
'mistralai/mistral-7b-instruct-v0.2': [
"dp-89e00f489d498885048e94f9809fbc76"
]
}
image_models = {"stability-ai/sdxl", "ai-forever/kandinsky-2.2"}
text_models = {"meta/llama-2-70b-chat", "mistralai/mistral-7b-instruct-v0.2"}
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
**kwargs: Any
) -> Generator[Union[str, ImageResponse], None, None]:
yield await cls.create_async(messages[-1]["content"], model, **kwargs)
@classmethod
async def create_async(
cls,
prompt: str,
model: str,
api_key: Optional[str] = None,
proxy: Optional[str] = None,
timeout: int = 180,
version: Optional[str] = None,
extra_data: Dict[str, Any] = {},
**kwargs: Any
) -> Union[str, ImageResponse]:
headers = {
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US',
'Connection': 'keep-alive',
'Origin': cls.url,
'Referer': f'{cls.url}/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
'sec-ch-ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
}
if version is None:
version = random.choice(cls.versions.get(model, []))
if api_key is not None:
headers["Authorization"] = f"Bearer {api_key}"
async with StreamSession(
proxies={"all": proxy},
headers=headers,
timeout=timeout
) as session:
data = {
"input": {
"prompt": prompt,
**extra_data
},
"version": version
}
if api_key is None:
data["model"] = cls.get_model(model)
url = "https://homepage.replicate.com/api/prediction"
else:
url = "https://api.replicate.com/v1/predictions"
async with session.post(url, json=data) as response:
await raise_for_status(response)
result = await response.json()
if "id" not in result:
raise ResponseError(f"Invalid response: {result}")
while True:
if api_key is None:
url = f"https://homepage.replicate.com/api/poll?id={result['id']}"
else:
url = f"https://api.replicate.com/v1/predictions/{result['id']}"
async with session.get(url) as response:
await raise_for_status(response)
result = await response.json()
if "status" not in result:
raise ResponseError(f"Invalid response: {result}")
if result["status"] == "succeeded":
output = result['output']
if model in cls.text_models:
return ''.join(output) if isinstance(output, list) else output
elif model in cls.image_models:
images: List[Any] = output
images = images[0] if len(images) == 1 else images
return ImageResponse(images, prompt)
elif result["status"] == "failed":
raise ResponseError(f"Prediction failed: {result}")
await asyncio.sleep(0.5)