import json import os import re import time import logging from datetime import datetime import openpyxl from kivy.app import App from kivy.properties import ObjectProperty from kivy.uix.label import Label from kivy.uix.button import Button from kivy.uix.boxlayout import BoxLayout from kivy.uix.popup import Popup from kivy.uix.progressbar import ProgressBar from kivy.uix.filechooser import FileChooserListView from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException from colorama import Fore, Style, init init(autoreset=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename=f'app_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' ) CONFIG_FILE = 'config.json' CHROME_PROFILE_PATH = "/Users/vadymsamoilenko/Library/Application Support/Google/Chrome/Profile 1" PORTAL_URL = "https://mmmspinco.brand-portal.adobe.com" WAIT_TIMEOUT = 30 LONG_TIMEOUT = 60 MAX_RETRIES = 3 SEARCH_DELAY = 3 def get_last_used_directory(): if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as file: config = json.load(file) return config.get('last_dir', '') return '' def save_last_used_directory(directory): with open(CONFIG_FILE, 'w') as file: json.dump({'last_dir': directory}, file) def normalize_string(s): return re.sub(r'[-_\s]+', '_', s.lower().strip()) def get_search_key(filename): if not filename: return "" base = os.path.splitext(filename)[0] return base.replace("-", "_") class FileSearchApp(App): file_path = ObjectProperty(None) def __init__(self): super().__init__() self.setup_logging() self.driver: webdriver.Chrome | None = None self.continue_button = None self.progress = None self.status_label = None self.label = None def setup_logging(self): self.logger = logging.getLogger(__name__) def build(self): self.file_path = "" self.extensions = [".psd", ".indd", ".tif", ".tiff"] layout = BoxLayout(orientation='vertical') self.label = Label(text="Select an Excel file with Asset IDs") layout.add_widget(self.label) # Создаем кнопки с использованием on_press вместо bind choose_button = Button(text="Choose File", on_press=self.select_file) layout.add_widget(choose_button) start_button = Button(text="Open Login Page", on_press=self.open_login_page) layout.add_widget(start_button) self.continue_button = Button( text="Continue After Login", disabled=True, on_press=self.start_search ) layout.add_widget(self.continue_button) self.progress = ProgressBar(max=100) layout.add_widget(self.progress) self.status_label = Label(text="Ready") layout.add_widget(self.status_label) return layout def select_file(self, instance): last_dir = get_last_used_directory() or '.' content = BoxLayout(orientation='vertical') filechooser = FileChooserListView(path=last_dir, filters=['*.xlsx']) def on_selection(instance, selection, touch=None): if selection: self.on_file_select(selection, popup) filechooser.bind(selection=on_selection) content.add_widget(filechooser) popup = Popup( title="Choose Excel File", content=content, size_hint=(0.9, 0.9) ) popup.open() def on_file_select(self, selection, popup): if selection: self.file_path = selection[0] self.label.text = f"Selected File: {self.file_path}" self.logger.info(f"Selected file: {self.file_path}") save_last_used_directory(os.path.dirname(self.file_path)) popup.dismiss() else: self.logger.warning("No file selected.") def wait_for_element(self, by, value, timeout=WAIT_TIMEOUT, retries=3): if self.driver is None: raise ValueError("WebDriver is not initialized") for attempt in range(retries): try: element = WebDriverWait(self.driver, timeout).until( EC.presence_of_element_located((by, value)) ) return element except TimeoutException: if attempt == retries - 1: raise time.sleep(2) return None def wait_for_search_results(self, timeout=LONG_TIMEOUT): if self.driver is None: return False try: WebDriverWait(self.driver, timeout).until( EC.presence_of_element_located((By.ID, "granite-omnisearch-result")) ) time.sleep(SEARCH_DELAY) return True except TimeoutException: return False def open_login_page(self, instance): if not self.file_path: popup = Popup( title='Warning', content=Label(text='Select a file before starting!'), size_hint=(0.8, 0.3) ) popup.open() return try: options = Options() options.add_argument(f"user-data-dir={CHROME_PROFILE_PATH}") self.driver = webdriver.Chrome(options=options) if self.driver is not None: self.driver.get(f"{PORTAL_URL}/mediaportal.html/content/dam/mac/mmmspinco") time.sleep(5) if self.wait_for_element(By.XPATH, "/html/body/coral-shell/coral-shell-content/div[1]/a/img"): self.continue_button.disabled = False self.logger.info("Login successful, ready to continue.") login_popup = Popup( title='Login', content=Label(text='Please log in and complete 2FA.\nClick "Continue After Login" once logged in.'), size_hint=(0.8, 0.3) ) login_popup.open() except Exception as e: self.logger.error(f"Error opening login page: {e}") if self.driver: self.driver.quit() def find_matching_link(self, file_name, psd_links): try: normalized_search_key = normalize_string(file_name) self.logger.info(f"Looking for links matching: {normalized_search_key}") best_match = None best_ratio = 0 for link in psd_links if isinstance(psd_links, list) else [psd_links]: try: href = link.get_attribute("href") if href: link_filename = os.path.splitext(href.split('/')[-1])[0] normalized_link = normalize_string(link_filename) if normalized_search_key in normalized_link or normalized_link in normalized_search_key: ratio = len(set(normalized_search_key.split('_')) & set(normalized_link.split('_'))) / \ len(set(normalized_search_key.split('_')) | set(normalized_link.split('_'))) if ratio > best_ratio: best_ratio = ratio best_match = href self.logger.info(f"Found better match: {href} (ratio: {ratio})") except Exception as e: self.logger.error(f"Error processing link: {e}") continue return best_match if best_match and best_ratio >= 0.5 else None except Exception as e: self.logger.error(f"Error finding PSD links: {e}") return None def process_row(self, row, total_rows): if self.driver is None: raise ValueError("WebDriver is not initialized") if not row or not hasattr(row[0], 'value'): return None, None asset_id = row[0].value file_name = row[2].value if len(row) >= 3 else None found_link = None if not asset_id or not file_name: return row[0].row if hasattr(row[0], 'row') else None, None self.status_label.text = f"Processing {asset_id}" self.logger.info(f"Processing Asset ID: {asset_id} with filename: {file_name}") for attempt in range(MAX_RETRIES): try: self.driver.get(f"{PORTAL_URL}/aem/search.html") time.sleep(SEARCH_DELAY) search_box = WebDriverWait(self.driver, WAIT_TIMEOUT).until( EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='fulltext']")) ) search_box.clear() search_box.send_keys(asset_id) search_box.send_keys(Keys.RETURN) WebDriverWait(self.driver, LONG_TIMEOUT).until( EC.presence_of_element_located((By.ID, "granite-omnisearch-result")) ) time.sleep(SEARCH_DELAY) asset_element = WebDriverWait(self.driver, WAIT_TIMEOUT).until( EC.presence_of_element_located(( By.XPATH, f"//coral-card-subtitle[contains(@class, 'foundation-collection-item-subtitle') and text()='{file_name}']" )) ) if asset_element: asset_card = asset_element.find_element(By.XPATH, "./ancestor::coral-card") self.driver.execute_script("arguments[0].scrollIntoView(true);", asset_card) time.sleep(1) self.driver.execute_script(""" var element = arguments[0]; var rect = element.getBoundingClientRect(); var event = new MouseEvent('mouseover', { 'view': window, 'bubbles': true, 'cancelable': true, 'clientX': rect.left + rect.width/2, 'clientY': rect.top + rect.height/2 }); element.dispatchEvent(event); """, asset_card) time.sleep(1) properties_button = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((By.CSS_SELECTOR, "button[title='Properties']")) ) self.driver.execute_script("arguments[0].click();", properties_button) time.sleep(2) psd_links = WebDriverWait(self.driver, LONG_TIMEOUT).until( EC.presence_of_all_elements_located(( By.XPATH, "//div[contains(@class, 'references-referencing')]//a[contains(@data-asset-path, '.psd')]" )) ) if psd_links: found_link = self.find_matching_link(file_name, psd_links) if found_link: break else: self.logger.warning(f"Asset card not found for {asset_id}") except Exception as e: self.logger.error(f"Error processing {asset_id} (attempt {attempt + 1}): {e}") if attempt == MAX_RETRIES - 1: break time.sleep(2) if hasattr(row[0], 'row'): progress = (row[0].row / total_rows) * 100 if total_rows > 0 else 0 self.progress.value = progress return row[0].row if hasattr(row[0], 'row') else None, found_link def start_search(self, instance): if not self.file_path or self.continue_button.disabled: popup = Popup( title='Warning', content=Label(text='Ensure you are logged in before starting!'), size_hint=(0.8, 0.3) ) popup.open() return try: wb = openpyxl.load_workbook(self.file_path) if wb is None: raise ValueError("Failed to load workbook") sheet = wb.active if sheet is None: raise ValueError("Failed to get active sheet") total_rows = sheet.max_row - 1 if sheet.max_row > 1 else 1 for row in sheet.iter_rows(min_row=2, max_row=sheet.max_row): try: row_num, found_link = self.process_row(row, total_rows) if row_num is not None: sheet.cell(row=row_num, column=2, value=found_link if found_link else "Links not found") save_path = self.file_path.replace(".xlsx", "_updated.xlsx") wb.save(save_path) except Exception as e: self.logger.error(f"Error processing row: {e}") continue self.logger.info("Processing completed") success_popup = Popup( title='Success', content=Label(text=f"Processing completed. File saved."), size_hint=(0.8, 0.3) ) success_popup.open() except Exception as e: self.logger.error(f"Error during search: {e}") error_popup = Popup( title='Error', content=Label(text=f"An error occurred: {str(e)}"), size_hint=(0.8, 0.3) ) error_popup.open() finally: if self.driver: self.driver.quit() if __name__ == '__main__': FileSearchApp().run()