from __future__ import annotations
import asyncio
import time
import json
from aiohttp import ClientSession, BaseConnector
from urllib.parse import quote
from typing import List, Dict
try:
from bs4 import BeautifulSoup
has_requirements = True
except ImportError:
has_requirements = False
from ..helper import get_connector
from ...errors import MissingRequirementsError, RateLimitError
from ...webdriver import WebDriver, get_driver_cookies, get_browser
BING_URL = "https://www.bing.com"
TIMEOUT_LOGIN = 1200
TIMEOUT_IMAGE_CREATION = 300
ERRORS = [
"this prompt is being reviewed",
"this prompt has been blocked",
"we're working hard to offer image creator in more languages",
"we can't create your images right now"
]
BAD_IMAGES = [
"https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
"https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
]
def wait_for_login(driver: WebDriver, timeout: int = TIMEOUT_LOGIN) -> None:
"""
Waits for the user to log in within a given timeout period.
Args:
driver (WebDriver): Webdriver for browser automation.
timeout (int): Maximum waiting time in seconds.
Raises:
RuntimeError: If the login process exceeds the timeout.
"""
driver.get(f"{BING_URL}/")
start_time = time.time()
while not driver.get_cookie("_U"):
if time.time() - start_time > timeout:
raise RuntimeError("Timeout error")
time.sleep(0.5)
def get_cookies_from_browser(proxy: str = None) -> dict[str, str]:
"""
Retrieves cookies from the browser using webdriver.
Args:
proxy (str, optional): Proxy configuration.
Returns:
dict[str, str]: Retrieved cookies.
"""
with get_browser(proxy=proxy) as driver:
wait_for_login(driver)
time.sleep(1)
return get_driver_cookies(driver)
def create_session(cookies: Dict[str, str], proxy: str = None, connector: BaseConnector = None) -> ClientSession:
"""
Creates a new client session with specified cookies and headers.
Args:
cookies (Dict[str, str]): Cookies to be used for the session.
Returns:
ClientSession: The created client session.
"""
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-encoding": "gzip, deflate, br",
"accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh-TW;q=0.7,zh;q=0.6",
"content-type": "application/x-www-form-urlencoded",
"referrer-policy": "origin-when-cross-origin",
"referrer": "https://www.bing.com/images/create/",
"origin": "https://www.bing.com",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54",
"sec-ch-ua": "\"Microsoft Edge\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"",
"sec-ch-ua-mobile": "?0",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
}
if cookies:
headers["Cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items())
return ClientSession(headers=headers, connector=get_connector(connector, proxy))
async def create_images(session: ClientSession, prompt: str, timeout: int = TIMEOUT_IMAGE_CREATION) -> List[str]:
"""
Creates images based on a given prompt using Bing's service.
Args:
session (ClientSession): Active client session.
prompt (str): Prompt to generate images.
proxy (str, optional): Proxy configuration.
timeout (int): Timeout for the request.
Returns:
List[str]: A list of URLs to the created images.
Raises:
RuntimeError: If image creation fails or times out.
"""
if not has_requirements:
raise MissingRequirementsError('Install "beautifulsoup4" package')
url_encoded_prompt = quote(prompt)
payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE"
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
async with session.post(url, allow_redirects=False, data=payload, timeout=timeout) as response:
response.raise_for_status()
text = (await response.text()).lower()
if "0 coins available" in text:
raise RateLimitError("No coins left. Log in with a different account or wait a while")
for error in ERRORS:
if error in text:
raise RuntimeError(f"Create images failed: {error}")
if response.status != 302:
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
async with session.post(url, allow_redirects=False, timeout=timeout) as response:
if response.status != 302:
raise RuntimeError(f"Create images failed. Code: {response.status}")
redirect_url = response.headers["Location"].replace("&nfy=1", "")
redirect_url = f"{BING_URL}{redirect_url}"
request_id = redirect_url.split("id=")[1]
async with session.get(redirect_url) as response:
response.raise_for_status()
polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
start_time = time.time()
while True:
if time.time() - start_time > timeout:
raise RuntimeError(f"Timeout error after {timeout} sec")
async with session.get(polling_url) as response:
if response.status != 200:
raise RuntimeError(f"Polling images faild. Code: {response.status}")
text = await response.text()
if not text or "GenerativeImagesStatusPage" in text:
await asyncio.sleep(1)
else:
break
error = None
try:
error = json.loads(text).get("errorMessage")
except:
pass
if error == "Pending":
raise RuntimeError("Prompt is been blocked")
elif error:
raise RuntimeError(error)
return read_images(text)
def read_images(html_content: str) -> List[str]:
"""
Extracts image URLs from the HTML content.
Args:
html_content (str): HTML content containing image URLs.
Returns:
List[str]: A list of image URLs.
"""
soup = BeautifulSoup(html_content, "html.parser")
tags = soup.find_all("img", class_="mimg")
if not tags:
tags = soup.find_all("img", class_="gir_mmimg")
images = [img["src"].split("?w=")[0] for img in tags]
if any(im in BAD_IMAGES for im in images):
raise RuntimeError("Bad images found")
if not images:
raise RuntimeError("No images found")
return images