async def task(work_queue: asyncio.Queue, client: httpx.AsyncClient, result: list[tuple[Post, Tag]]) -> None:
while not work_queue.empty():
source: Source = await work_queue.get()
url = source.feed_url
logger.info(f"Working on {url}")
try:
res = await client.get(url)
except httpx.RequestError as exc:
msg = f"An error occurred while requesting {exc.request.url!r}."
logger.exception(msg)
source.last_result = "\n" + msg
await sync_to_async(source.save, thread_sensitive=True)(update_fields=["last_result"])
continue
if res.status_code not in range(200, 300):
msg = f"Status code: {res.status_code}"
logger.error(f"Source: {source}, " + msg)
source.last_result += "\n" + msg
await sync_to_async(source.save, thread_sensitive=True)(update_fields=["last_result"])
continue
logger.info(f"Parsing response from source: {source}")
feed = feedparser.parse(res)
if not feed.has_key("entries") or not feed.entries:
msg = f"No feed data for {source}"
logger.error(msg)
source.last_result += "\n" + msg
await sync_to_async(source.save, thread_sensitive=True)(update_fields=["last_result"])
continue
logger.info(f"Pulling posts from source: {source}")
posts_and_tags = pull_posts(source, feed)
logger.info(f"Compeleted pulling posts from source: {source}")
result.append(posts_and_tags)
async def gather_tasks() -> None:
# Create the queue of work
work_queue = asyncio.Queue()
# A mutable object to send to each task so task can update it
results: list[tuple[Post, Tag]] = []
# Put some work in the queue
async for source in Source.objects.filter(is_active=True):
await work_queue.put(source)
# Run the tasks
async with httpx.AsyncClient() as client:
await asyncio.gather(*[asyncio.create_task(task(work_queue, client, results)) for _ in range(cpu_cores_number * 2 + 1)])
return results
def refreshfeeds():
logger.info("Started pulling feeds...")
result = asyncio.run(gather_tasks())
Last active
February 8, 2024 17:55
-
-
Save mh-firouzjah/239fc5e99352407d693eff981ecd4a69 to your computer and use it in GitHub Desktop.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment