import time import random from datetime import datetime import gspread from google.oauth2.service_account import Credentials from gspread.exceptions import APIError from selenium import webdriver from import Service from import Options from import By from selenium.webdriver.common.keys import Keys from import WebDriverWait from import expected_conditions as EC from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.action_chains import ActionChains from amazoncaptcha import AmazonCaptcha def log_step(step): print(f"[{}] {step}") # Set up Chrome options def create_headless_chrome_driver(): log_step("Setting up Chrome WebDriver") options = Options() # Disable images prefs = { "profile.managed_default_content_settings.images": 2, # 2 = block images "profile.default_content_setting_values.images": 2 } # options.add_experimental_option("prefs", prefs) #options.add_argument("--headless=new") options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-gpu') desktop_user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' options.add_argument(f'user-agent={desktop_user_agent}') service = Service('/usr/bin/chromedriver') driver = webdriver.Chrome(service=service, options=options) driver.set_window_size(1920, 1080) return driver # Set up Google Sheets API def get_google_sheets_client(credentials_file): log_step("Setting up Google Sheets API client") scopes = ["", ""] credentials = Credentials.from_service_account_file(credentials_file, scopes=scopes) client = gspread.authorize(credentials) return client def safe_google_sheets_request(func, *args, **kwargs): max_retries = 5 for attempt in range(max_retries): try: return func(*args, **kwargs) except APIError as e: if 'quota exceeded' in str(e).lower(): wait_time = 60 log_step(f"Quota exceeded. Waiting for {wait_time} seconds before retrying...") time.sleep(wait_time) else: raise e log_step("Max retries exceeded. Unable to complete the request.") return None # Add human-like interaction def simulate_human_interaction(driver): log_step("Simulating human-like interaction...") actions = ActionChains(driver) # Ensure the driver is not idle try: WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'body'))) except TimeoutException: log_step("Unable to locate body tag for interaction.") return for _ in range(random.randint(2, 5)): try: x_offset = random.randint(-10, 10) y_offset = random.randint(-10, 10) log_step(f"Attempting to move mouse by offset ({x_offset}, {y_offset})") actions.move_by_offset(x_offset, y_offset).perform() except Exception as e: log_step(f"Error during mouse movement simulation: {e}") time.sleep(random.uniform(0.1, 0.5)) # Scroll up and down randomly for _ in range(random.randint(1, 3)): try: scroll_value = random.randint(-300, 300) driver.execute_script(f"window.scrollBy(0, {scroll_value});") log_step(f"Scrolled by {scroll_value} pixels.") except Exception as e: log_step(f"Error during scrolling simulation: {e}") time.sleep(random.uniform(0.2, 0.6)) log_step("Human-like interaction simulated.") def handle_amazon_errors(driver): """Handle various Amazon anti-bot pages and CAPTCHAs""" max_retries = 3 for attempt in range(max_retries): try: Oops_continue_xpath = '/html/body/center/p[2]/font/b/a' # Check for "Oops" page if "It's rush hour and traffic is piling up on that page. Please try again in a short while." in driver.page_source: log_step("Detected 'Oops' page, handling...") WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.XPATH, "//a[contains(@href, 'ref=cs_503_link')]")) ).click() time.sleep(random.uniform(2, 4)) return True # Check for CAPTCHA if "Enter the characters you see below" in driver.page_source: return solve_captcha(driver) # Check for "robot" verification if "robot check" in driver.page_source.lower(): log_step("Detected robot verification page") return solve_captcha(driver) # If we reached a valid page, return success if "" in driver.title: return True except Exception as e: log_step(f"Error handling Amazon page: {str(e)}") if attempt == max_retries - 1: driver.save_screenshot("error_screenshot.png") time.sleep(2) log_step("Failed to resolve Amazon error page after retries") return False def solve_captcha(driver): """Enhanced CAPTCHA solving with better error handling""" max_retries = 3 for attempt in range(max_retries): try: log_step("Attempting to solve CAPTCHA...") # Get CAPTCHA image captcha_img = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, "//img[contains(@src, 'captcha')]")) ) log_step(captcha_img) captcha_url = captcha_img.get_attribute('src') log_step(captcha_url) # Solve using library captcha = AmazonCaptcha.fromlink(captcha_url) solution = captcha.solve(keep_logs=True) log_step(solution) if "Not solved" in solution: log_step("Failed to solve CAPTCHA") raise Exception # Input solution captcha_input = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.ID, 'captchacharacters')) ) captcha_input.clear() captcha_input.send_keys(solution) # Submit form submit_button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.XPATH, "//button[contains(text(), 'Continue shopping')]")) ) time.sleep(random.uniform(2, 3)) return True except Exception as e: log_step(f"CAPTCHA solving failed: {str(e)}") driver.delete_all_cookies() driver.refresh() driver.save_screenshot("captcha_error.png") return False def read_data_grouped_by_keyword(sheet): data = sheet.get_all_values()[3:] # Skip the first two rows grouped_data = {} for row in data: keyword = row[0] product_name = row[1] product_id = row[2] if keyword not in grouped_data: grouped_data[keyword] = [] grouped_data[keyword].append((product_name, product_id)) return grouped_data def read_locations(sheet): header_row = sheet.row_values(2) log_step(header_row) locations = [] log_step("In read locations") for loc in header_row[3:]: if loc.strip(): locations.append(loc) else: break return locations def input_pincode(driver, pincode, retries=3): for attempt in range(retries): try: print("Waiting for pincode input element to be present...") pincode_input = WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.ID, 'GLUXZipUpdateInput')) ) print("Pincode input element found") # Ensure the element is interactable for inner_attempt in range(5): # Attempt to make the element clickable multiple times try: print("Waiting for pincode input element to be clickable...") pincode_input = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.ID, 'GLUXZipUpdateInput')) ) print("Pincode input element is clickable") break except Exception as e: print(f"Element not clickable yet, retrying: {e}") time.sleep(1 + inner_attempt) # Increasing delay # Scroll to the element to ensure it's in view driver.execute_script("arguments[0].scrollIntoView(true);", pincode_input) # Try clicking with JavaScript as a fallback driver.execute_script("arguments[0].click();", pincode_input) # Input the pincode pincode_input.clear() pincode_input.send_keys(pincode) pincode_input.send_keys(Keys.RETURN) # Press Enter to apply the pincode # Wait for the pincode to be processed simulate_human_interaction(driver) return True except Exception as e: print(f"Pincode input attempt {attempt + 1} failed: {e}") if attempt < retries - 1: print("Reloading the page and retrying...") driver.refresh() time.sleep(5) # Wait for the page to reload # scrape_amazon_product(asin, pincode) else: print("Pincode input failed after retries") return False return False def set_location(driver, location): try: location_button_xpath = 'nav-global-location-popover-link' log_step(f"Setting location: {location}") WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.ID, location_button_xpath)) ).click() input_pincode(driver, location) log_step(f"Location {location} set successfully") except TimeoutException: log_step(f"Timeout occurred while setting location: {location}") raise def get_ranks_and_statuses_for_keyword(driver, keyword, products): try: log_step(f"Searching for keyword: {keyword}") # Check for error pages before searching if not handle_amazon_errors(driver): log_step("Could not resolve error page, returning empty results") return {product_id: [] for _, product_id in products} # Your current selectors search_box_ID = '//input[contains(@aria-label, "Search")]' product_list_xpath = 'sg-col-inner' product_card_xpath = './/div[contains(@role, "listitem")]' ad_indicator_xpath = './/span[contains(@class, "puis-label-popover-default")]' # Let's improve the product card selector to be more specific and reliable improved_product_xpath = './/div[contains(@class, "s-result-item") and contains(@class, "s-asin")]' # Rest of your search functionality remains the same search_box = WebDriverWait(driver, 20).until( EC.element_to_be_clickable((By.XPATH, search_box_ID)) ) search_box.clear() search_box.send_keys(Keys.CONTROL + "a") search_box.send_keys(Keys.BACKSPACE) time.sleep(1) search_box.send_keys(keyword) time.sleep(1) search_box.send_keys(Keys.RETURN) time.sleep(5) # Wait for the product grid to load WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.CLASS_NAME, product_list_xpath)) ) # Use the improved selector to find products product_elements = driver.find_elements(By.XPATH, improved_product_xpath) log_step(f"Found {len(product_elements)} products for keyword: {keyword}") results = {product_id: [] for _, product_id in products} for index, product in enumerate(product_elements, start=1): try: # Get ASIN using a more reliable method asin = product.get_attribute('data-asin') # Add logging to debug ASIN extraction #log_step(f"Found product with ASIN: {asin}") # Check for sponsored/ad status ad_status = "ORGANIC" try: ad_div = product.find_element(By.XPATH, ad_indicator_xpath) if ad_div.is_displayed(): ad_status = "AD" except Exception: pass # No ad indicator found, keeping status as ORGANIC # Match with our target products for product_name, product_id in products: if product_id == asin: # Direct comparison instead of 'in' log_step(f"Product '{product_name}' (ID: {product_id}) found at rank {index} with status: {ad_status}") results[product_id].append((index, ad_status)) except Exception as e: log_step(f"Error processing product at index {index}: {str(e)}") continue return results except Exception as e: log_step(f"Error searching for {keyword}: {str(e)}") return {product_id: [] for _, product_id in products} def update_google_sheet(sheet, all_data): try: log_step("Updating Google Sheet...") num_locations = len(all_data) sheet.insert_cols([[] for _ in range(num_locations + 1)], col=4) log_step(f"Inserted {num_locations + 1} columns starting at the 4th position.") header_cells = [] data_cells = [] for col_offset, (location, data) in enumerate(all_data.items()): col_index = 4 + col_offset # Add headers header_cells.append(gspread.Cell(2, col_index, location)) header_cells.append(gspread.Cell(3, col_index, f"{"%m-%d-%Y %H:%M")}")) # Add data (start from row 4 now) for row, (keyword, product_name, product_id, ranks_statuses) in enumerate(data, start=4): # Join multiple rank-status pairs if they exist rank_status = " | ".join(ranks_statuses) if ranks_statuses else "50+ / OOS" data_cells.append(gspread.Cell(row, col_index, rank_status)) safe_google_sheets_request(sheet.update_cells, header_cells + data_cells, value_input_option="USER_ENTERED") log_step("Google Sheet updated successfully.") except Exception as e: log_step(f"Error updating Google Sheet: {e}") raise # Re-raise the exception for better error tracking def main(): log_step("Starting script...") credentials_file = './credentials.json' sheet_name = 'RANKING 2' sheet_tab_name = 'Amazon' client = get_google_sheets_client(credentials_file) sheet = locations = read_locations(sheet) data_grouped_by_keyword = read_data_grouped_by_keyword(sheet) driver = create_headless_chrome_driver() all_data = {} try: driver.get('') time.sleep(random.uniform(3, 6)) for location in locations: set_location(driver, location) time.sleep(random.uniform(3, 6)) location_data = [] for keyword, products in data_grouped_by_keyword.items(): # Add retry mechanism per keyword max_retries = 2 for attempt in range(max_retries): results = get_ranks_and_statuses_for_keyword(driver, keyword, products) if any(results.values()): # Check if we got valid results break log_step(f"Retrying keyword {keyword} ({attempt+1}/{max_retries})") time.sleep(5) for product_name, product_id in products: product_results = results.get(product_id, [(None, None)]) # Default to a list with (None, None) rank_status_list = [ f"{rank if rank else '50+ | OOS'}-{status if status else ''}" for rank, status in product_results ] location_data.append((keyword, product_name, product_id, rank_status_list)) time.sleep(random.uniform(3, 6)) all_data[location] = location_data finally: driver.quit() update_google_sheet(sheet, all_data) if __name__ == "__main__": main()