import asyncio import os import mimetypes from typing import List, Optional from urllib.parse import urlparse import aiohttp from utils.randomizers import get_random_uuid async def download_file( url: str, save_directory: str, headers: Optional[dict] = None ) -> Optional[str]: try: os.makedirs(save_directory, exist_ok=True) parsed_url = urlparse(url) filename = os.path.basename(parsed_url.path) if not filename or "." not in filename: async with aiohttp.ClientSession() as session: async with session.head(url, headers=headers) as response: if response.status == 200: content_disposition = response.headers.get( "Content-Disposition", "" ) if "filename=" in content_disposition: filename = content_disposition.split("filename=")[1].strip( "\"'" ) else: content_type = response.headers.get("Content-Type", "") if content_type: extension = mimetypes.guess_extension( content_type.split(";")[0] ) if extension: filename = f"{get_random_uuid()}{extension}" filename = filename or get_random_uuid() save_path = os.path.join(save_directory, filename) async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: if response.status == 200: with open(save_path, "wb") as file: async for chunk in response.content.iter_chunked(8192): file.write(chunk) print(f"File downloaded successfully: {save_path}") return save_path else: print(f"Failed to download file. HTTP status: {response.status}") return None except Exception as e: print(f"Error downloading file from {url}: {e}") return None async def download_files( urls: List[str], save_directory: str, headers: Optional[dict] = None ) -> List[Optional[str]]: print(f"Starting download of {len(urls)} files to {save_directory}") coroutines = [download_file(url, save_directory, headers) for url in urls] results = await asyncio.gather(*coroutines, return_exceptions=True) final_results = [] for i, result in enumerate(results): if isinstance(result, Exception): print(f"Exception during download of {urls[i]}: {result}") final_results.append(None) else: final_results.append(result) successful_downloads = sum(1 for result in final_results if result is not None) print( f"Download completed: {successful_downloads}/{len(urls)} files downloaded successfully" ) return final_results