summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/MagickPen.py
blob: 0f476ecaa3aa62da3f89bef2057ae068706e78f1 (plain) (tree)














































































































                                                                                                                                  
from __future__ import annotations

import time
import random
import hashlib
import re
from aiohttp import ClientSession

from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from .helper import format_prompt

class MagickPen(AsyncGeneratorProvider, ProviderModelMixin):
    url = "https://magickpen.com"
    api_endpoint = "https://api.magickpen.com/chat/free"
    working = True
    supports_gpt_4 = True
    supports_stream = False
    
    default_model = 'gpt-4o-mini'
    models = ['gpt-4o-mini']
    
    model_aliases = {}

    @classmethod
    def get_model(cls, model: str) -> str:
        if model in cls.models:
            return model
        elif model in cls.model_aliases:
            return cls.model_aliases[model]
        else:
            return cls.default_model

    @classmethod
    async def get_secrets(cls):
        url = 'https://magickpen.com/_nuxt/02c76dc.js'
        async with ClientSession() as session:
            async with session.get(url) as response:
                if response.status == 200:
                    text = await response.text()
                    x_api_secret_match = re.search(r'"X-API-Secret":"([^"]+)"', text)
                    secret_match = re.search(r'secret:\s*"([^"]+)"', text)
                    
                    x_api_secret = x_api_secret_match.group(1) if x_api_secret_match else None
                    secret = secret_match.group(1) if secret_match else None
                    
                    # Generate timestamp and nonce dynamically
                    timestamp = str(int(time.time() * 1000))
                    nonce = str(random.random())
                    
                    # Generate signature
                    signature_parts = ["TGDBU9zCgM", timestamp, nonce]
                    signature_string = "".join(sorted(signature_parts))
                    signature = hashlib.md5(signature_string.encode()).hexdigest()
                    
                    return {
                        'X-API-Secret': x_api_secret,
                        'signature': signature,
                        'timestamp': timestamp,
                        'nonce': nonce,
                        'secret': secret
                    }
                else:
                    print(f"Error while fetching the file: {response.status}")
                    return None

    @classmethod
    async def create_async_generator(
        cls,
        model: str,
        messages: Messages,
        proxy: str = None,
        **kwargs
    ) -> AsyncResult:
        model = cls.get_model(model)
        
        secrets = await cls.get_secrets()
        if not secrets:
            raise Exception("Failed to obtain necessary secrets")

        headers = {
            "accept": "application/json, text/plain, */*",
            "accept-language": "en-US,en;q=0.9",
            "cache-control": "no-cache",
            "content-type": "application/json",
            "nonce": secrets['nonce'],
            "origin": "https://magickpen.com",
            "pragma": "no-cache",
            "priority": "u=1, i",
            "referer": "https://magickpen.com/",
            "sec-ch-ua": '"Chromium";v="127", "Not)A;Brand";v="99"',
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": '"Linux"',
            "sec-fetch-dest": "empty",
            "sec-fetch-mode": "cors",
            "sec-fetch-site": "same-site",
            "secret": secrets['secret'],
            "signature": secrets['signature'],
            "timestamp": secrets['timestamp'],
            "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36",
            "x-api-secret": secrets['X-API-Secret']
        }
        
        async with ClientSession(headers=headers) as session:
            data = {
                "history": [{"role": "user", "content": format_prompt(messages)}]
            }
            async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response:
                response.raise_for_status()
                result = await response.text()
                yield result