development:python:selemium
Differences
This shows you the differences between two versions of the page.
Next revision | Previous revision | ||
development:python:selemium [2024/08/07 02:28] – created tungnt | development:python:selemium [2024/08/15 15:40] (current) – tungnt | ||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== | + | ====== |
+ | ===== Chờ 1 ID load xong ===== | ||
+ | |||
+ | |||
+ | <file python> | ||
+ | WebDriverWait(driver, | ||
+ | EC.presence_of_element_located((By.ID, | ||
+ | ) | ||
+ | </ | ||
+ | |||
+ | ===== Kiểm tra element có text tương ứng không ===== | ||
+ | |||
+ | <file python> | ||
+ | |||
+ | WebDriverWait(driver, | ||
+ | EC.text_to_be_present_in_element((By.XPATH, | ||
+ | </ | ||
+ | |||
+ | ===== Chờ một overload xong và ẩn đi ===== | ||
+ | |||
+ | <file python> | ||
+ | WebDriverWait(driver, | ||
+ | EC.invisibility_of_element_located((By.ID, | ||
+ | ) | ||
+ | </ | ||
+ | |||
+ | ===== Chạy Javascript ===== | ||
+ | |||
+ | <file python> | ||
+ | driver.execute_script(''' | ||
+ | var fDate = $("# | ||
+ | var tDate = $("# | ||
+ | getTranHisData($("# | ||
+ | ''' | ||
+ | </ | ||
+ | |||
+ | <file python> | ||
+ | driver.execute_script(''' | ||
+ | var c = document.createElement(' | ||
+ | var img = document.getElementById(' | ||
+ | c.height = img.naturalHeight; | ||
+ | c.width = img.naturalWidth; | ||
+ | var ctx = c.getContext(' | ||
+ | ctx.drawImage(img, | ||
+ | var base64String = c.toDataURL(); | ||
+ | var c_detect = document.createElement(' | ||
+ | c_detect.setAttribute(" | ||
+ | c_detect.innerHTML = base64String; | ||
+ | document.body.appendChild(c_detect); | ||
+ | ''' | ||
+ | </ | ||
+ | |||
+ | ===== Lấy element có thể click được ===== | ||
+ | |||
+ | <file python> | ||
+ | element = WebDriverWait(driver, | ||
+ | EC.element_to_be_clickable((By.XPATH, | ||
+ | ) | ||
+ | |||
+ | element.click() | ||
+ | </ | ||
+ | |||
+ | ====== Ví dụ ====== | ||
+ | |||
+ | **Thư viện:** | ||
+ | |||
+ | <file python browser.py> | ||
+ | import time | ||
+ | from selenium import webdriver | ||
+ | from selenium.webdriver.chrome.options import Options | ||
+ | from selenium.common.exceptions import NoSuchElementException, | ||
+ | from selenium.webdriver.support.ui import WebDriverWait | ||
+ | from selenium.webdriver.support import expected_conditions as EC | ||
+ | from selenium.webdriver.common.by import By | ||
+ | |||
+ | from django.conf import settings | ||
+ | |||
+ | class Browser: | ||
+ | download_path = None | ||
+ | |||
+ | driver = None | ||
+ | |||
+ | def __init__(self, | ||
+ | self.download_path = download_path | ||
+ | |||
+ | self.set_driver() | ||
+ | |||
+ | def set_driver(self): | ||
+ | options = Options() | ||
+ | options.headless = True | ||
+ | options.add_argument(' | ||
+ | options.add_argument(' | ||
+ | options.add_argument(" | ||
+ | options.add_argument(' | ||
+ | options.add_argument(' | ||
+ | |||
+ | #if settings.APP_ENV == ' | ||
+ | # | ||
+ | |||
+ | options.add_argument(" | ||
+ | options.add_argument(" | ||
+ | options.add_argument(' | ||
+ | options.add_experimental_option(" | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | }) | ||
+ | options.add_argument(' | ||
+ | options.add_argument(' | ||
+ | # | ||
+ | |||
+ | options.add_argument(" | ||
+ | options.add_argument(" | ||
+ | |||
+ | chrome_driver_path = settings.RESOURCE_PATH + '/ | ||
+ | |||
+ | # | ||
+ | self.driver = webdriver.Chrome(options=options, | ||
+ | |||
+ | def get_driver(self): | ||
+ | return self.driver | ||
+ | |||
+ | def destroy_driver(self): | ||
+ | self.driver.quit() | ||
+ | |||
+ | @staticmethod | ||
+ | def click_by_xpath(driver, | ||
+ | element = WebDriverWait(driver, | ||
+ | EC.element_to_be_clickable((By.XPATH, | ||
+ | ) | ||
+ | |||
+ | element.click() | ||
+ | |||
+ | @staticmethod | ||
+ | def download_excel(self, | ||
+ | seconds = 0 | ||
+ | has_file_download = False | ||
+ | while has_file_download == False and seconds < timeout: | ||
+ | time.sleep(1) | ||
+ | |||
+ | if download_func != None: | ||
+ | has_file_download = download_func() | ||
+ | |||
+ | seconds += 1 | ||
+ | return has_file_download | ||
+ | </ | ||
+ | |||
+ | **Ví dụ:** | ||
+ | |||
+ | <file python mb.py> | ||
+ | import logging | ||
+ | import datetime | ||
+ | import sys | ||
+ | import time | ||
+ | import hashlib | ||
+ | import traceback | ||
+ | |||
+ | from bank.services.captcha import Captcha | ||
+ | from selenium.common.exceptions import NoSuchElementException, | ||
+ | from selenium.webdriver.support.ui import WebDriverWait | ||
+ | from selenium.webdriver.support import expected_conditions as EC | ||
+ | from selenium.webdriver.common.by import By | ||
+ | from django.conf import settings | ||
+ | from django.core.management.base import BaseCommand | ||
+ | from bank.services.browser import Browser | ||
+ | from bank.services.helper import Helper | ||
+ | |||
+ | logger = logging.getLogger(' | ||
+ | class Command(BaseCommand): | ||
+ | help = 'Crawl MBBank IB' | ||
+ | bank_no = ' | ||
+ | |||
+ | account_no_1 = ' | ||
+ | account_no_2 = ' | ||
+ | account_no_3 = ' | ||
+ | account_no_4 = ' | ||
+ | account_no_5 = ' | ||
+ | |||
+ | def __init__(self): | ||
+ | self.download_path = settings.DATA_PATH + "/ | ||
+ | |||
+ | def get_transactions(self, | ||
+ | transactions = [] | ||
+ | |||
+ | current_page = 1 | ||
+ | page = 1 | ||
+ | while page < 100000000: | ||
+ | time.sleep(5) | ||
+ | |||
+ | element = self.check_exists_by_xpath(driver, | ||
+ | if element: | ||
+ | current_page = int(element.find_element(By.CLASS_NAME, | ||
+ | print(' | ||
+ | |||
+ | if page > current_page: | ||
+ | break | ||
+ | |||
+ | page += 1 | ||
+ | |||
+ | trans_elements = driver.find_elements(By.XPATH, | ||
+ | |||
+ | # logger.error(trans_elements) | ||
+ | |||
+ | for element in trans_elements: | ||
+ | bank_no = self.bank_no | ||
+ | reference_id = element.find_element(By.XPATH, | ||
+ | trans_time_o = element.find_element(By.XPATH, | ||
+ | trans_time = trans_time_o | ||
+ | |||
+ | trans_time = datetime.datetime.strptime(trans_time_o, | ||
+ | |||
+ | amoutnOut = element.find_element(By.XPATH, | ||
+ | amoutnIn = element.find_element(By.XPATH, | ||
+ | |||
+ | amoutnOut = int(amoutnOut.replace(',', | ||
+ | amoutnIn = int(amoutnIn.replace(',', | ||
+ | |||
+ | amount = 0 | ||
+ | if amoutnOut : | ||
+ | amount = amoutnOut | ||
+ | |||
+ | if amoutnIn: | ||
+ | amount = amoutnIn | ||
+ | |||
+ | balance = element.find_element(By.XPATH, | ||
+ | balance = balance.replace(' | ||
+ | content = element.find_element(By.XPATH, | ||
+ | content = content.strip() | ||
+ | |||
+ | if account_no != self.account_no_4: | ||
+ | balance = 0 | ||
+ | | ||
+ | checksum = hashlib.md5( | ||
+ | (str(reference_id) + self.bank_no + account_no + str(amount) + str(balance) + content + trans_time_o).encode( | ||
+ | ' | ||
+ | |||
+ | transaction = { | ||
+ | ' | ||
+ | ' | ||
+ | ' | ||
+ | ' | ||
+ | ' | ||
+ | ' | ||
+ | ' | ||
+ | ' | ||
+ | ' | ||
+ | ' | ||
+ | ' | ||
+ | } | ||
+ | |||
+ | print(transaction) | ||
+ | |||
+ | size = len(driver.find_elements(By.XPATH, | ||
+ | |||
+ | size = size - 1 | ||
+ | |||
+ | return transactions | ||
+ | |||
+ | def check_exists_by_xpath(self, | ||
+ | try: | ||
+ | ele = driver.find_element(By.XPATH, | ||
+ | except NoSuchElementException: | ||
+ | return False | ||
+ | return ele | ||
+ | |||
+ | def getAccountSelect(self, | ||
+ | if account_no == self.account_no_1: | ||
+ | return '// | ||
+ | elif account_no == self.account_no_2: | ||
+ | return '// | ||
+ | elif account_no == self.account_no_3: | ||
+ | return '// | ||
+ | elif account_no == self.account_no_4: | ||
+ | return '// | ||
+ | elif account_no == self.account_no_5: | ||
+ | return '// | ||
+ | else: | ||
+ | return "" | ||
+ | |||
+ | def get_account(self, | ||
+ | try: | ||
+ | logging.getLogger(' | ||
+ | |||
+ | transaction_url = " | ||
+ | |||
+ | driver.get(transaction_url) | ||
+ | |||
+ | WebDriverWait(driver, | ||
+ | lambda driver: len(driver.find_element(By.XPATH, | ||
+ | |||
+ | WebDriverWait(driver, | ||
+ | EC.presence_of_element_located((By.ID, | ||
+ | ) | ||
+ | |||
+ | time.sleep(5) | ||
+ | |||
+ | # driver.save_screenshot(settings.LOGS_PATH + '/ | ||
+ | | ||
+ | self.click_by_xpath(driver, | ||
+ | |||
+ | # driver.save_screenshot(settings.LOGS_PATH + '/ | ||
+ | |||
+ | self.click_by_xpath(driver, | ||
+ | |||
+ | driver.save_screenshot(settings.LOGS_PATH + '/ | ||
+ | |||
+ | tod = datetime.datetime.now() | ||
+ | to_date = tod.strftime(" | ||
+ | date_index = int(to_date) -1 | ||
+ | |||
+ | self.click_by_xpath(driver, | ||
+ | |||
+ | time.sleep(5) | ||
+ | | ||
+ | driver.execute_script(" | ||
+ | driver.execute_script(" | ||
+ | driver.execute_script(" | ||
+ | |||
+ | |||
+ | driver.find_element(By.XPATH, | ||
+ | |||
+ | WebDriverWait(driver, | ||
+ | EC.presence_of_element_located((By.ID, | ||
+ | ) | ||
+ | |||
+ | self.get_transactions(driver, | ||
+ | except Exception as e: | ||
+ | print(' | ||
+ | |||
+ | logger.error(traceback.format_exc()) | ||
+ | |||
+ | def login(self, driver): | ||
+ | current_url = driver.current_url | ||
+ | |||
+ | driver.save_screenshot(settings.LOGS_PATH + '/ | ||
+ | |||
+ | WebDriverWait(driver, | ||
+ | EC.presence_of_element_located((By.CLASS_NAME, | ||
+ | ) | ||
+ | |||
+ | # | ||
+ | captcha_base64 = driver.find_element(By.CSS_SELECTOR, | ||
+ | |||
+ | captcha = Captcha() | ||
+ | captcha_txt = captcha.request(captcha_base64) | ||
+ | |||
+ | print(captcha_txt) | ||
+ | |||
+ | if captcha_txt == "": | ||
+ | driver.quit() | ||
+ | |||
+ | driver.find_element(By.XPATH, | ||
+ | driver.find_element(By.XPATH, | ||
+ | driver.find_element(By.XPATH, | ||
+ | driver.find_element(By.XPATH, | ||
+ | | ||
+ | driver.save_screenshot(settings.LOGS_PATH + '/ | ||
+ | | ||
+ | driver.find_element(By.XPATH, | ||
+ | |||
+ | driver.save_screenshot(settings.LOGS_PATH + '/ | ||
+ | |||
+ | WebDriverWait(driver, | ||
+ | |||
+ | driver.save_screenshot(settings.LOGS_PATH + '/ | ||
+ | |||
+ | return driver | ||
+ | |||
+ | def crawl(self): | ||
+ | browser = Browser(self.download_path) | ||
+ | |||
+ | driver = browser.get_driver() | ||
+ | |||
+ | driver.get(" | ||
+ | |||
+ | current_url = driver.current_url | ||
+ | |||
+ | try: | ||
+ | self.login(driver) | ||
+ | |||
+ | self.get_account(driver, | ||
+ | | ||
+ | self.get_account(driver, | ||
+ | self.get_account(driver, | ||
+ | self.get_account(driver, | ||
+ | | ||
+ | self.get_account(driver, | ||
+ | |||
+ | except Exception as e: | ||
+ | print(' | ||
+ | |||
+ | logger.error(traceback.format_exc()) | ||
+ | |||
+ | self.quit(driver) | ||
+ | |||
+ | def click_by_xpath(self, | ||
+ | element = WebDriverWait(driver, | ||
+ | EC.element_to_be_clickable((By.XPATH, | ||
+ | ) | ||
+ | |||
+ | element.click() | ||
+ | |||
+ | def handle(self, | ||
+ | if Helper.count_process_exists(' | ||
+ | logging.getLogger(' | ||
+ | |||
+ | print(" | ||
+ | |||
+ | sys.exit() | ||
+ | |||
+ | self.crawl() | ||
+ | |||
+ | def quit(self, driver): | ||
+ | time.sleep(2) | ||
+ | driver.close() | ||
+ | driver.quit() | ||
+ | </ | ||
+ | |||
+ | |||
+ | |
development/python/selemium.1722997723.txt.gz · Last modified: 2024/08/07 02:28 by tungnt