summaryrefslogtreecommitdiffstats
path: root/g4f/Provider/MagickPen.py
blob: eab705360a322a2a8a98e7c8e720b6b2e98feb03 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from __future__ import annotations

import time
import random
import hashlib
import re
from aiohttp import ClientSession

from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from .helper import format_prompt

class MagickPen(AsyncGeneratorProvider, ProviderModelMixin):
    url = "https://magickpen.com"
    api_endpoint_free = "https://api.magickpen.com/chat/free"
    api_endpoint_ask = "https://api.magickpen.com/ask"
    working = True
    supports_gpt_4 = True
    supports_stream = False
    
    default_model = 'free'
    models = ['free', 'ask']
    
    model_aliases = {
        "gpt-4o-mini": "free",
        "gpt-4o-mini": "ask",
    }

    @classmethod
    def get_model(cls, model: str) -> str:
        if model in cls.models:
            return model
        elif model in cls.model_aliases:
            return cls.model_aliases[model]
        else:
            return cls.default_model

    @classmethod
    async def get_secrets(cls):
        url = 'https://magickpen.com/_nuxt/02c76dc.js'
        async with ClientSession() as session:
            async with session.get(url) as response:
                if response.status == 200:
                    text = await response.text()
                    x_api_secret_match = re.search(r'"X-API-Secret":"([^"]+)"', text)
                    secret_match = re.search(r'secret:\s*"([^"]+)"', text)
                    
                    x_api_secret = x_api_secret_match.group(1) if x_api_secret_match else None
                    secret = secret_match.group(1) if secret_match else None
                    
                    # Generate timestamp and nonce dynamically
                    timestamp = str(int(time.time() * 1000))
                    nonce = str(random.random())
                    
                    # Generate signature
                    signature_parts = ["TGDBU9zCgM", timestamp, nonce]
                    signature_string = "".join(sorted(signature_parts))
                    signature = hashlib.md5(signature_string.encode()).hexdigest()
                    
                    return {
                        'X-API-Secret': x_api_secret,
                        'signature': signature,
                        'timestamp': timestamp,
                        'nonce': nonce,
                        'secret': secret
                    }
                else:
                    print(f"Error while fetching the file: {response.status}")
                    return None

    @classmethod
    async def create_async_generator(
        cls,
        model: str,
        messages: Messages,
        proxy: str = None,
        **kwargs
    ) -> AsyncResult:
        model = cls.get_model(model)
        
        secrets = await cls.get_secrets()
        if not secrets:
            raise Exception("Failed to obtain necessary secrets")

        headers = {
            "accept": "application/json, text/plain, */*",
            "accept-language": "en-US,en;q=0.9",
            "cache-control": "no-cache",
            "content-type": "application/json",
            "nonce": secrets['nonce'],
            "origin": "https://magickpen.com",
            "pragma": "no-cache",
            "priority": "u=1, i",
            "referer": "https://magickpen.com/",
            "sec-ch-ua": '"Chromium";v="127", "Not)A;Brand";v="99"',
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": '"Linux"',
            "sec-fetch-dest": "empty",
            "sec-fetch-mode": "cors",
            "sec-fetch-site": "same-site",
            "secret": secrets['secret'],
            "signature": secrets['signature'],
            "timestamp": secrets['timestamp'],
            "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36",
            "x-api-secret": secrets['X-API-Secret']
        }
        
        async with ClientSession(headers=headers) as session:
            if model == 'free':
                data = {
                    "history": [{"role": "user", "content": format_prompt(messages)}]
                }
                async with session.post(cls.api_endpoint_free, json=data, proxy=proxy) as response:
                    response.raise_for_status()
                    result = await response.text()
                    yield result
            
            elif model == 'ask':
                data = {
                    "query": format_prompt(messages),
                    "plan": "Pay as you go"
                }
                async with session.post(cls.api_endpoint_ask, json=data, proxy=proxy) as response:
                    response.raise_for_status()
                    async for chunk in response.content:
                        if chunk:
                            yield chunk.decode()
            
            else:
                raise ValueError(f"Unknown model: {model}")