Created
July 24, 2023 16:01
-
-
Save ZeN220/d72f24a6b89055e5b327daa696c622b3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import random | |
import re | |
from contextlib import asynccontextmanager | |
from typing import Final | |
from aiohttp import ClientSession | |
from base import BaseProxyGetter, Proxy | |
logger = logging.getLogger(__name__) | |
GET_PROXIES_COOLDOWN_REGEXP = re.compile( | |
r"\(Before a new request: (\d+) seconds\)" | |
) | |
class AstroProxyAPIError(Exception): | |
pass | |
class AstroProxyGetter(BaseProxyGetter): | |
API_URL: Final[str] = "https://astroproxy.com/api/v1" | |
def __init__( | |
self, | |
api_key: str, | |
session: ClientSession, | |
get_proxy_per_request: int = 399, | |
): | |
self.api_key = api_key | |
self.session = session | |
self.get_proxy_per_request = get_proxy_per_request | |
self.proxies: set[Proxy] = set() | |
self.used_proxies: set[Proxy] = set() | |
self.lock = asyncio.Lock() | |
self.event = asyncio.Event() | |
@asynccontextmanager | |
async def get_proxy(self) -> Proxy: | |
async with self.lock: | |
unused_proxies = await self.get_unused_proxies() | |
proxy = random.choice(tuple(unused_proxies)) | |
self.used_proxies.add(proxy) | |
yield proxy | |
self.event.set() | |
self.used_proxies.remove(proxy) | |
async def get_unused_proxies(self) -> set[Proxy]: | |
unused_proxies = self.proxies - self.used_proxies | |
while not unused_proxies: | |
tasks = [ | |
asyncio.create_task(self.event.wait()), | |
asyncio.create_task(self.get_proxies()), | |
] | |
results, _ = await asyncio.wait( | |
tasks, | |
return_when=asyncio.FIRST_COMPLETED, | |
) | |
for task in results: | |
result = task.result() | |
if isinstance(result, list): | |
self.proxies.update(result) | |
unused_proxies = self.proxies - self.used_proxies | |
return unused_proxies | |
async def get_proxies(self) -> list[Proxy]: | |
try: | |
response = await self.api_request( | |
"ports", | |
order="random", | |
count=self.get_proxy_per_request, | |
) | |
except AstroProxyAPIError as exc: | |
cooldown = GET_PROXIES_COOLDOWN_REGEXP.search(str(exc)) | |
if cooldown is None: | |
raise | |
# + 1 для подстраховки | |
cooldown_as_int = int(cooldown.group(1)) + 1 | |
await asyncio.sleep(cooldown_as_int) | |
return await self.get_proxies() | |
raw_proxies = response["data"]["ports"] | |
if not raw_proxies: | |
raise ValueError( | |
"Proxy service returned empty list of proxies. " | |
"Maybe you need to buy more proxies?" | |
) | |
proxies = [] | |
for proxy in raw_proxies: | |
proxy = Proxy( | |
scheme="socks5", | |
hostname=proxy["node"]["ip"], | |
port=proxy["ports"]["socks"], | |
username=proxy["access"]["login"], | |
password=proxy["access"]["password"], | |
id=proxy["id"], | |
) | |
a = Proxy( | |
scheme="socks5", | |
hostname="1.1.1.1", | |
port=1, | |
username="a", | |
password="b", | |
id=123123, | |
) | |
proxies.append(a) | |
proxies.append(proxy) | |
return proxies | |
async def api_request( | |
self, method: str, request_method: str = "GET", **params | |
): | |
params = params or {} | |
logger.debug( | |
"Send request %s method to proxy service with params %s", | |
method, | |
params, | |
) | |
if request_method == "GET": | |
response = await self._get_api_request(method, **params) | |
else: | |
response = await self._post_api_request(method, **params) | |
logger.debug( | |
"Response from proxy service method %s: %s", method, response | |
) | |
if response["status"] == "error": | |
raise AstroProxyAPIError(response["message"]) | |
return response | |
async def _get_api_request(self, method: str, **params) -> dict: | |
params.update({"token": self.api_key}) | |
async with self.session.get( | |
url=f"{self.API_URL}/{method}", | |
params=params, | |
) as result: | |
response = await result.json() | |
return response | |
async def _post_api_request(self, method: str, **params) -> dict: | |
async with self.session.post( | |
url=f"{self.API_URL}/{method}", | |
data=params, | |
params={"token": self.api_key}, | |
) as result: | |
response = await result.json() | |
return response | |
async def main(): | |
logging.basicConfig(level=10) | |
a = AstroProxyGetter("2d2baa9567a013a6", ClientSession()) | |
async def func(): | |
async with a.get_proxy() as proxy: | |
print(proxy.id) | |
await asyncio.sleep(5) | |
print("done!") | |
async def func2(): | |
async with a.get_proxy() as proxy: | |
print(proxy.id) | |
pass | |
await asyncio.gather(func(), func(), func2(), func2(), func2(), func2()) | |
__import__("asyncio").run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment