376 lines
No EOL
14 KiB
Python
376 lines
No EOL
14 KiB
Python
import json
|
||
import os
|
||
import re
|
||
import time
|
||
import logging
|
||
from datetime import datetime
|
||
import openpyxl
|
||
from kivy.app import App
|
||
from kivy.properties import ObjectProperty
|
||
from kivy.uix.label import Label
|
||
from kivy.uix.button import Button
|
||
from kivy.uix.boxlayout import BoxLayout
|
||
from kivy.uix.popup import Popup
|
||
from kivy.uix.progressbar import ProgressBar
|
||
from kivy.uix.filechooser import FileChooserListView
|
||
from selenium import webdriver
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.common.keys import Keys
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.common.action_chains import ActionChains
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException
|
||
from colorama import Fore, Style, init
|
||
|
||
init(autoreset=True)
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
filename=f'app_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
|
||
)
|
||
|
||
CONFIG_FILE = 'config.json'
|
||
CHROME_PROFILE_PATH = "/Users/vadymsamoilenko/Library/Application Support/Google/Chrome/Profile 1"
|
||
PORTAL_URL = "https://mmmspinco.brand-portal.adobe.com"
|
||
WAIT_TIMEOUT = 30
|
||
LONG_TIMEOUT = 60
|
||
MAX_RETRIES = 3
|
||
SEARCH_DELAY = 3
|
||
|
||
def get_last_used_directory():
|
||
if os.path.exists(CONFIG_FILE):
|
||
with open(CONFIG_FILE, 'r') as file:
|
||
config = json.load(file)
|
||
return config.get('last_dir', '')
|
||
return ''
|
||
|
||
def save_last_used_directory(directory):
|
||
with open(CONFIG_FILE, 'w') as file:
|
||
json.dump({'last_dir': directory}, file)
|
||
|
||
def normalize_string(s):
|
||
return re.sub(r'[-_\s]+', '_', s.lower().strip())
|
||
|
||
def get_search_key(filename):
|
||
if not filename:
|
||
return ""
|
||
base = os.path.splitext(filename)[0]
|
||
return base.replace("-", "_")
|
||
|
||
class FileSearchApp(App):
|
||
file_path = ObjectProperty(None)
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.setup_logging()
|
||
self.driver: webdriver.Chrome | None = None
|
||
self.continue_button = None
|
||
self.progress = None
|
||
self.status_label = None
|
||
self.label = None
|
||
|
||
def setup_logging(self):
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
def build(self):
|
||
self.file_path = ""
|
||
self.extensions = [".psd", ".indd", ".tif", ".tiff"]
|
||
|
||
layout = BoxLayout(orientation='vertical')
|
||
self.label = Label(text="Select an Excel file with Asset IDs")
|
||
layout.add_widget(self.label)
|
||
|
||
# Создаем кнопки с использованием on_press вместо bind
|
||
choose_button = Button(text="Choose File", on_press=self.select_file)
|
||
layout.add_widget(choose_button)
|
||
|
||
start_button = Button(text="Open Login Page", on_press=self.open_login_page)
|
||
layout.add_widget(start_button)
|
||
|
||
self.continue_button = Button(
|
||
text="Continue After Login",
|
||
disabled=True,
|
||
on_press=self.start_search
|
||
)
|
||
layout.add_widget(self.continue_button)
|
||
|
||
self.progress = ProgressBar(max=100)
|
||
layout.add_widget(self.progress)
|
||
|
||
self.status_label = Label(text="Ready")
|
||
layout.add_widget(self.status_label)
|
||
|
||
return layout
|
||
|
||
def select_file(self, instance):
|
||
last_dir = get_last_used_directory() or '.'
|
||
content = BoxLayout(orientation='vertical')
|
||
filechooser = FileChooserListView(path=last_dir, filters=['*.xlsx'])
|
||
|
||
def on_selection(instance, selection, touch=None):
|
||
if selection:
|
||
self.on_file_select(selection, popup)
|
||
|
||
filechooser.bind(selection=on_selection)
|
||
content.add_widget(filechooser)
|
||
|
||
popup = Popup(
|
||
title="Choose Excel File",
|
||
content=content,
|
||
size_hint=(0.9, 0.9)
|
||
)
|
||
popup.open()
|
||
|
||
def on_file_select(self, selection, popup):
|
||
if selection:
|
||
self.file_path = selection[0]
|
||
self.label.text = f"Selected File: {self.file_path}"
|
||
self.logger.info(f"Selected file: {self.file_path}")
|
||
save_last_used_directory(os.path.dirname(self.file_path))
|
||
popup.dismiss()
|
||
else:
|
||
self.logger.warning("No file selected.")
|
||
|
||
def wait_for_element(self, by, value, timeout=WAIT_TIMEOUT, retries=3):
|
||
if self.driver is None:
|
||
raise ValueError("WebDriver is not initialized")
|
||
|
||
for attempt in range(retries):
|
||
try:
|
||
element = WebDriverWait(self.driver, timeout).until(
|
||
EC.presence_of_element_located((by, value))
|
||
)
|
||
return element
|
||
except TimeoutException:
|
||
if attempt == retries - 1:
|
||
raise
|
||
time.sleep(2)
|
||
return None
|
||
|
||
def wait_for_search_results(self, timeout=LONG_TIMEOUT):
|
||
if self.driver is None:
|
||
return False
|
||
|
||
try:
|
||
WebDriverWait(self.driver, timeout).until(
|
||
EC.presence_of_element_located((By.ID, "granite-omnisearch-result"))
|
||
)
|
||
time.sleep(SEARCH_DELAY)
|
||
return True
|
||
except TimeoutException:
|
||
return False
|
||
|
||
def open_login_page(self, instance):
|
||
if not self.file_path:
|
||
popup = Popup(
|
||
title='Warning',
|
||
content=Label(text='Select a file before starting!'),
|
||
size_hint=(0.8, 0.3)
|
||
)
|
||
popup.open()
|
||
return
|
||
|
||
try:
|
||
options = Options()
|
||
options.add_argument(f"user-data-dir={CHROME_PROFILE_PATH}")
|
||
self.driver = webdriver.Chrome(options=options)
|
||
|
||
if self.driver is not None:
|
||
self.driver.get(f"{PORTAL_URL}/mediaportal.html/content/dam/mac/mmmspinco")
|
||
time.sleep(5)
|
||
|
||
if self.wait_for_element(By.XPATH, "/html/body/coral-shell/coral-shell-content/div[1]/a/img"):
|
||
self.continue_button.disabled = False
|
||
self.logger.info("Login successful, ready to continue.")
|
||
login_popup = Popup(
|
||
title='Login',
|
||
content=Label(text='Please log in and complete 2FA.\nClick "Continue After Login" once logged in.'),
|
||
size_hint=(0.8, 0.3)
|
||
)
|
||
login_popup.open()
|
||
except Exception as e:
|
||
self.logger.error(f"Error opening login page: {e}")
|
||
if self.driver:
|
||
self.driver.quit()
|
||
|
||
def find_matching_link(self, file_name, psd_links):
|
||
try:
|
||
normalized_search_key = normalize_string(file_name)
|
||
self.logger.info(f"Looking for links matching: {normalized_search_key}")
|
||
|
||
best_match = None
|
||
best_ratio = 0
|
||
|
||
for link in psd_links if isinstance(psd_links, list) else [psd_links]:
|
||
try:
|
||
href = link.get_attribute("href")
|
||
if href:
|
||
link_filename = os.path.splitext(href.split('/')[-1])[0]
|
||
normalized_link = normalize_string(link_filename)
|
||
|
||
if normalized_search_key in normalized_link or normalized_link in normalized_search_key:
|
||
ratio = len(set(normalized_search_key.split('_')) & set(normalized_link.split('_'))) / \
|
||
len(set(normalized_search_key.split('_')) | set(normalized_link.split('_')))
|
||
|
||
if ratio > best_ratio:
|
||
best_ratio = ratio
|
||
best_match = href
|
||
self.logger.info(f"Found better match: {href} (ratio: {ratio})")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error processing link: {e}")
|
||
continue
|
||
|
||
return best_match if best_match and best_ratio >= 0.5 else None
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error finding PSD links: {e}")
|
||
return None
|
||
|
||
def process_row(self, row, total_rows):
|
||
if self.driver is None:
|
||
raise ValueError("WebDriver is not initialized")
|
||
|
||
if not row or not hasattr(row[0], 'value'):
|
||
return None, None
|
||
|
||
asset_id = row[0].value
|
||
file_name = row[2].value if len(row) >= 3 else None
|
||
found_link = None
|
||
|
||
if not asset_id or not file_name:
|
||
return row[0].row if hasattr(row[0], 'row') else None, None
|
||
|
||
self.status_label.text = f"Processing {asset_id}"
|
||
self.logger.info(f"Processing Asset ID: {asset_id} with filename: {file_name}")
|
||
|
||
for attempt in range(MAX_RETRIES):
|
||
try:
|
||
self.driver.get(f"{PORTAL_URL}/aem/search.html")
|
||
time.sleep(SEARCH_DELAY)
|
||
|
||
search_box = WebDriverWait(self.driver, WAIT_TIMEOUT).until(
|
||
EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='fulltext']"))
|
||
)
|
||
search_box.clear()
|
||
search_box.send_keys(asset_id)
|
||
search_box.send_keys(Keys.RETURN)
|
||
|
||
WebDriverWait(self.driver, LONG_TIMEOUT).until(
|
||
EC.presence_of_element_located((By.ID, "granite-omnisearch-result"))
|
||
)
|
||
time.sleep(SEARCH_DELAY)
|
||
|
||
asset_element = WebDriverWait(self.driver, WAIT_TIMEOUT).until(
|
||
EC.presence_of_element_located((
|
||
By.XPATH,
|
||
f"//coral-card-subtitle[contains(@class, 'foundation-collection-item-subtitle') and text()='{file_name}']"
|
||
))
|
||
)
|
||
|
||
if asset_element:
|
||
asset_card = asset_element.find_element(By.XPATH, "./ancestor::coral-card")
|
||
self.driver.execute_script("arguments[0].scrollIntoView(true);", asset_card)
|
||
time.sleep(1)
|
||
|
||
self.driver.execute_script("""
|
||
var element = arguments[0];
|
||
var rect = element.getBoundingClientRect();
|
||
var event = new MouseEvent('mouseover', {
|
||
'view': window,
|
||
'bubbles': true,
|
||
'cancelable': true,
|
||
'clientX': rect.left + rect.width/2,
|
||
'clientY': rect.top + rect.height/2
|
||
});
|
||
element.dispatchEvent(event);
|
||
""", asset_card)
|
||
time.sleep(1)
|
||
|
||
properties_button = WebDriverWait(self.driver, 10).until(
|
||
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[title='Properties']"))
|
||
)
|
||
self.driver.execute_script("arguments[0].click();", properties_button)
|
||
time.sleep(2)
|
||
|
||
psd_links = WebDriverWait(self.driver, LONG_TIMEOUT).until(
|
||
EC.presence_of_all_elements_located((
|
||
By.XPATH,
|
||
"//div[contains(@class, 'references-referencing')]//a[contains(@data-asset-path, '.psd')]"
|
||
))
|
||
)
|
||
|
||
if psd_links:
|
||
found_link = self.find_matching_link(file_name, psd_links)
|
||
if found_link:
|
||
break
|
||
else:
|
||
self.logger.warning(f"Asset card not found for {asset_id}")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error processing {asset_id} (attempt {attempt + 1}): {e}")
|
||
if attempt == MAX_RETRIES - 1:
|
||
break
|
||
time.sleep(2)
|
||
|
||
if hasattr(row[0], 'row'):
|
||
progress = (row[0].row / total_rows) * 100 if total_rows > 0 else 0
|
||
self.progress.value = progress
|
||
|
||
return row[0].row if hasattr(row[0], 'row') else None, found_link
|
||
|
||
def start_search(self, instance):
|
||
if not self.file_path or self.continue_button.disabled:
|
||
popup = Popup(
|
||
title='Warning',
|
||
content=Label(text='Ensure you are logged in before starting!'),
|
||
size_hint=(0.8, 0.3)
|
||
)
|
||
popup.open()
|
||
return
|
||
|
||
try:
|
||
wb = openpyxl.load_workbook(self.file_path)
|
||
if wb is None:
|
||
raise ValueError("Failed to load workbook")
|
||
|
||
sheet = wb.active
|
||
if sheet is None:
|
||
raise ValueError("Failed to get active sheet")
|
||
|
||
total_rows = sheet.max_row - 1 if sheet.max_row > 1 else 1
|
||
|
||
for row in sheet.iter_rows(min_row=2, max_row=sheet.max_row):
|
||
try:
|
||
row_num, found_link = self.process_row(row, total_rows)
|
||
if row_num is not None:
|
||
sheet.cell(row=row_num, column=2, value=found_link if found_link else "Links not found")
|
||
save_path = self.file_path.replace(".xlsx", "_updated.xlsx")
|
||
wb.save(save_path)
|
||
except Exception as e:
|
||
self.logger.error(f"Error processing row: {e}")
|
||
continue
|
||
|
||
self.logger.info("Processing completed")
|
||
success_popup = Popup(
|
||
title='Success',
|
||
content=Label(text=f"Processing completed. File saved."),
|
||
size_hint=(0.8, 0.3)
|
||
)
|
||
success_popup.open()
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error during search: {e}")
|
||
error_popup = Popup(
|
||
title='Error',
|
||
content=Label(text=f"An error occurred: {str(e)}"),
|
||
size_hint=(0.8, 0.3)
|
||
)
|
||
error_popup.open()
|
||
finally:
|
||
if self.driver:
|
||
self.driver.quit()
|
||
|
||
if __name__ == '__main__':
|
||
FileSearchApp().run() |