Source code for ppc_robot_lib.steps.input.check_urls

import asyncio
import logging
import socket
import ssl
import time
from collections import defaultdict
from dataclasses import dataclass
from urllib.parse import urlparse, parse_qs, urlencode, urlunsplit, SplitResult
from urllib.parse import urlsplit

import aiodns
import aiohttp
import numpy
import pandas
from aiohttp import client_exceptions
from django.conf import settings
from prometheus_client import Counter, Summary
from yarl import URL

from ppc_robot_lib.redis import create_async_redis_client
from ppc_robot_lib.steps.abstract_step import AbstractStep
from ppc_robot_lib.stores import UserSettingsStore
from ppc_robot_lib.tasks import StepPerformance, TaskContextInterface
from ppc_robot_lib.throttling.crawler_throttling import CrawlerThrottling


logger = logging.getLogger('ppc_robot_lib.url_check')

url_check_total = Counter('robot_url_check_total', 'Total URLs checked.')
url_check_seconds = Summary('robot_url_check_seconds', 'Time spent checking URLs.')
url_check_errors_total = Counter('robot_url_check_errors_total', 'Total errors ocurred when checking URLs', ['code'])
url_check_errors_total.labels('error')

USER_AGENT_BASE = (
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
    'Chrome/88.0.4324.151 Safari/537.36'
)
USER_AGENT_ROBOT_SUFFIX = '(compatible; PPCRobot/1.0; +https://ppc-robot.net/)'

URL_INDEX = 0
CODE_INDEX = 1
REDIRECT_TO_INDEX = 2
ERROR_INDEX = 3


@dataclass
class DomainCheckRecord:
    urls: int = 0
    checked: int = 0
    errors: int = 0


MIN_RESPONSE_TIME = 0.25


[docs] class CheckUrlsStep(AbstractStep): """ Performs an URL check. Input can be either a :py:class:`pandas.DataFrame`, or :py:class:`pandas.core.groupby.DataFrameGroupBy`. If you use a DataFrame, you have to set the ``column`` argument to a column with a valid URL. Please note that this step does not perform any kind of deduplication, so you might end up checking a single URL multiple times. If you use a group-by result as the input, index with group keys is always used as the URL. The index cannot be hierarchical, this means the group-by cannot be constructed with multiple columns for grouping. Output table will contain 4 columns: ``url`` URL being checked. ``code`` HTTP Status code. Might be blank if the connection failed and page was not retrieved. Either ``error`` or ``code`` will be present. ``redirect_to`` URL to which the user would be redirected, filled-in if the server has returned a ``Location`` header. ``error`` Error code if the check failed. Either ``error`` or ``code`` will be present. **Example:** >>> from ppc_robot_lib.steps.input import CheckUrlsStep >>> CheckUrlsStep("urls_grouped", output_table="check_result", use_group=True) The ``urls_grouped`` can be prepared using the :py:class:`ppc_robot_lib.steps.transformations.group_by_column.GroupByColumnStep`:: >>> from ppc_robot_lib.steps.transformations import GroupByColumnStep >>> GroupByColumnStep('urls', ['Url'], 'urls_grouped') """ def __init__( self, input_table, output_table, use_group=False, column: str = None, parallel_tasks: int = 4, max_conn: int = 16, timeout: int = 10, hide_robot_suffix=False, use_get_requests=False, force_https: bool = False, check_autotagging_params: bool = False, ): """ :param input_table: Input table with URLs. Can be either dataframe or :param output_table: Output table. Must not exist. :param use_group: Set to ``True`` if the input table is a :py:class:`pandas.core.groupby.DataFrameGroupBy` instance. Exclusive with the ``column`` parameter. :param column: Column with URL from the input table. :param parallel_tasks: Maximum number of URLs to check in parallel. :param max_conn: Maximum keep-alive connections. :param timeout: URL download timeout. :param force_https: check https or not :param check_autotagging_params: check redirect params in url """ if not use_group and not column: raise ValueError('You have to set column which contains URL when use_group is set to False') elif use_group and column: raise ValueError('You cannot use column attribute when use_group is set to True.') self.use_group = use_group self.column = column self.input_table = input_table self.output_table = output_table self.parallel_tasks = parallel_tasks self.max_conn = max_conn self.timeout = timeout self.hide_robot_suffix = hide_robot_suffix self.use_get_requests = use_get_requests self.force_https = force_https self.check_autotagging_urls = check_autotagging_params self.urls_done = 0 self.headers = {} self.workers_running = 0 self.retry_sleep_interval = 3 self.errors = 0 self.requests = 0 self.domains: dict[str, DomainCheckRecord] = defaultdict(DomainCheckRecord) self.errors_sent = set() self.proxy = None self.gclid_val: str = 'PPCRobot' self._min_delay = MIN_RESPONSE_TIME / parallel_tasks self._max_delay = 4 / parallel_tasks self._current_delay = 0.5 / parallel_tasks self._hostname_ips: dict[str, str] = {} self._dns_resolver: aiodns.DNSResolver | None = None @staticmethod def build_user_agent(hide_suffix): if not hide_suffix: return f'{USER_AGENT_BASE} {USER_AGENT_ROBOT_SUFFIX}' else: return USER_AGENT_BASE def get_proxy(self, user_id) -> str | None: settings_store = UserSettingsStore(user_id) use_proxy = settings_store.get('url_check_use_proxy', default=False) if use_proxy: return getattr(settings, 'PPC_ROBOT_HTTP_PROXY', None) else: return None def execute(self, task_ctx: TaskContextInterface) -> StepPerformance: table = task_ctx.work_set.get_table(self.input_table) if self.use_group: url_iter = table.groups.keys() else: url_iter = table[self.column] # Set the proxy server - valid only for selected users, others wil get `proxy=None`. if task_ctx.user_credentials and task_ctx.user_credentials.user_id: self.proxy = self.get_proxy(task_ctx.user_credentials.user_id) else: self.proxy = None # TODO: Make User-Agent and Languages configurable. self.headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'cs,en-US;q=0.7,en;q=0.3', 'Upgrade-Insecure-Requests': '1', 'User-Agent': self.build_user_agent(self.hide_robot_suffix), } loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: new_table = loop.run_until_complete(self.check_urls(url_iter, task_ctx)) finally: loop.close() task_ctx.work_set.set_table(self.output_table, new_table) return StepPerformance(new_table, rows_out=len(new_table.index)) async def check_urls(self, url_iterable, task_ctx: TaskContextInterface): self._dns_resolver = aiodns.DNSResolver() try: self.errors = 0 self.requests = 0 rows = len(url_iterable) if 0 < rows < self.parallel_tasks: self.parallel_tasks = rows logger.info('Preparing to check %d URLs in %d tasks.', rows, self.parallel_tasks) index = numpy.arange(rows) result_frame = pandas.concat( [ pandas.Series(name='url', index=index, dtype=str), pandas.Series( name='code', index=index, dtype=numpy.int16, data=numpy.zeros(rows, dtype=numpy.int16) ), pandas.Series(name='redirect_to', index=index, dtype=object, data=numpy.nan), pandas.Series(name='error', index=index, dtype=object, data=numpy.nan), ], axis=1, ) # type: pandas.DataFrame if rows > 0: queue = asyncio.Queue(maxsize=self.parallel_tasks) conn = aiohttp.TCPConnector(ttl_dns_cache=3600, limit=self.max_conn) http_timeout = aiohttp.ClientTimeout( total=self.timeout * 3, sock_connect=self.timeout, sock_read=self.timeout, ) async with aiohttp.ClientSession( connector=conn, raise_for_status=False, timeout=http_timeout, ) as session: logger.info(f'Creating {self.parallel_tasks} workers') # Create parallel_tasks workers. workers = [ asyncio.create_task(self.check_url(session, queue, result_frame, task_ctx)) for _ in range(0, self.parallel_tasks) ] await self.produce_urls(queue, url_iterable) logger.info(f'Added all {rows} URLs to queue: queue size={queue.qsize()}') await self.add_stop_elements(queue) logger.info(f'Added stop elements, joining queue: queue size={queue.qsize()}') # Wait until all items are processed. await queue.join() logger.info(f'Cancelling workers: queue size={queue.qsize()}') # Cancel all workers. for worker in workers: worker.cancel() logger.info('All workers cancelled.') per_domain_results = [ f'{domain}: checked {record.checked} of {record.urls} URLs, found {record.errors} errors' for domain, record in self.domains.items() ] if not per_domain_results: per_domain_results.append('no URLs checked') logger.info(f'URL check complete. Per-domain check summary:\n' + '\n'.join(per_domain_results)) return result_frame except Exception as e: logger.exception('Error in URL checker') raise async def produce_urls(self, queue, url_iterable): redis_conn = await create_async_redis_client() try: crawler_throttling = CrawlerThrottling(redis_conn) # Add all URLs to the queue. # TODO: Support for robots.txt. for i, url in enumerate(url_iterable): try: parsed_url = urlparse(url) hostname = parsed_url.hostname except ValueError: hostname = None if isinstance(hostname, str) and hostname: self.domains[hostname].urls += 1 ip_key = await self.resolve_ip_address(hostname) # Wait between requests. effective_delay = self.compute_effective_delay(self.parallel_tasks, self._current_delay) actual_delay = await crawler_throttling.get_next_delay(ip_key, effective_delay) await asyncio.sleep(actual_delay) else: hostname = None await queue.put((i, url, hostname)) await redis_conn.aclose() except Exception: await redis_conn.aclose() raise @classmethod def compute_effective_delay(cls, concurrency: int, delay: float) -> float: """ Computes a new delay. The formula in this method decreases the concurrency degree for slower websites by stretching the delay. :param concurrency: :param delay: Current expected delay. :return: New effective delay. """ # 0.5 for concurrency = 1, 1 for concurrency = 4, 2 for concurrency = 10 min_concurrency = 0.5 + (concurrency - 1) / 6 min_c_time = 4.0 min_c_delay = min_c_time / concurrency multiplier = (concurrency / min_concurrency - 1) / (min_c_delay - MIN_RESPONSE_TIME / concurrency) return delay * (1 + multiplier * (delay - MIN_RESPONSE_TIME / concurrency)) async def resolve_ip_address(self, hostname: str) -> str: """ Tries to resolve the given hostname to an IPv4 address. If there are multiple IP addresses for the given hostname, one will be chosen at random. In case of an error, the hostname itself will be returned. :param hostname: Hostname. :return: IP address or the hostname itself. """ if hostname in self._hostname_ips: return self._hostname_ips[hostname] try: dns_response = await self._dns_resolver.gethostbyname(hostname, socket.AF_INET) if dns_response and dns_response.addresses: result = dns_response.addresses[0] # The addresses are returned in random order. logger.info(f'DNS: {hostname} resolved to {result}') else: result = hostname logger.info(f'DNS: {hostname} has no IPv4 address') except aiodns.error.DNSError: logger.warning(f'DNS: Failed to resolve {hostname}', exc_info=True) result = hostname self._hostname_ips[hostname] = result return result async def add_stop_elements(self, queue): # Add stop element for each parallel task. for _ in range(0, self.parallel_tasks): await queue.put((None, None, None)) async def check_url( self, session: aiohttp.ClientSession, queue: asyncio.Queue, results: pandas.DataFrame, task_ctx: TaskContextInterface, ): self.workers_running += 1 if self.use_get_requests: method = session.get else: method = session.head try: while True: index, url, hostname = await queue.get() try: if index is None: break url_check_total.inc() self.domains[hostname].checked += 1 results.iat[index, URL_INDEX] = url can_retry = True error = None retries = 0 max_retries = 3 if self.check_autotagging_urls: url = self.add_autotagging_param(url) while can_retry: error = None retries += 1 status = None start_time = time.monotonic() try: self.requests += 1 if retries > 1: logger.info('Retry #%d for URL %s', retries, url) with url_check_seconds.time(): async with method( url, headers=self.headers, allow_redirects=True, timeout=self.timeout, proxy=self.proxy, ) as response: # type: aiohttp.ClientResponse can_retry = False status = response.status results.iat[index, CODE_INDEX] = status if response.history: location = str(response.url) else: location = response.headers.get('Location', None) if status >= 400: url_check_errors_total.labels(response.status).inc() else: if self.check_autotagging_urls: param_present = self.check_autotagging_param(response.url) if not param_present: error = 'Auto-Tagging Parameter Removed' results.iat[index, CODE_INDEX] = 0 if self.force_https and error is None: if response.history: if response.url.scheme == 'https': results.iat[index, CODE_INDEX] = response.history[0].status else: error = 'Redirect URL Not Secure (HTTP)' results.iat[index, CODE_INDEX] = 0 else: if response.url.scheme == 'https': results.iat[index, CODE_INDEX] = response.status else: error = 'Not Secure (HTTP)' results.iat[index, CODE_INDEX] = 0 if location: results.iat[index, REDIRECT_TO_INDEX] = location except GeneratorExit: raise except client_exceptions.TooManyRedirects: error = 'Redirect Loop' except (aiohttp.ServerTimeoutError, TimeoutError): error = 'Timeout while fetching URL.' except aiohttp.ServerDisconnectedError: error = 'Server disconnected during the connection.' except (aiohttp.ServerConnectionError, aiohttp.ClientConnectorError): error = 'Error while connecting to the server.' except aiohttp.ClientResponseError as client_resp_exc: can_retry = False error = 'Invalid response retrieved.' self.log_url_exception(task_ctx, url, client_resp_exc) except aiohttp.ClientPayloadError as client_payload_exc: can_retry = False error = 'Malformed response retrieved.' self.log_url_exception(task_ctx, url, client_payload_exc) except ssl.CertificateError as cert_exc: can_retry = False error = f'Certificate error: {str(cert_exc)}' except aiohttp.InvalidURL: can_retry = False error = 'Invalid URL' except ValueError as exc: can_retry = False error = str(exc) except Exception: can_retry = False logger.exception('Unknown error occured.') error = 'Unknown error ocurred.' latency = time.monotonic() - start_time self._current_delay = self.compute_next_delay(status, latency) if error is not None: url_check_errors_total.labels('error').inc() self.domains[hostname].errors += 1 self.errors += 1 if can_retry: # Maximum error ratio is determined by total number of requests made so far. allowed_error_ratio = self.get_allowed_error_ratio(self.requests) if retries >= max_retries or (self.errors / self.requests) >= allowed_error_ratio: can_retry = False else: await asyncio.sleep(self.retry_sleep_interval * max(1.0, self._current_delay)) if error is not None: results.iat[index, ERROR_INDEX] = error self.urls_done += 1 if self.urls_done % 100 == 0: logger.info('%d/%d URLs checked.', self.urls_done, len(results.index)) finally: queue.task_done() except GeneratorExit: raise except Exception: logger.exception('Internal PPC Robot error.') finally: self.workers_running -= 1 # If I am the last worker and the queue is still not empty, fill the remaining URLs with errors. if self.workers_running < 1 and queue.qsize() > 0: logger.warning('Cleaning up the queue!') while True: index, url, hostname = await queue.get() try: if index is None: break results.iat[index, URL_INDEX] = url results.iat[index, ERROR_INDEX] = 'Internal PPC Robot error.' finally: queue.task_done() def compute_next_delay(self, status: int | None, latency: float) -> float: target_delay = latency / self.parallel_tasks next_delay = (self._current_delay + target_delay) / 2 # Non-successful responses are not allowed to decrease the delay. if status is None or status < 200 or status >= 300: next_delay = max(next_delay, self._current_delay) if next_delay < self._min_delay: next_delay = self._min_delay elif next_delay > self._max_delay: next_delay = self._max_delay return next_delay def add_autotagging_param(self, url: str) -> str: """ Adds gclid parameter to given url address. :param url: input url :return: extended url by autotagging parameter """ url_split: SplitResult = urlsplit(url) query = url_split[3] query_dict = parse_qs(query) query_dict['gclid'] = self.gclid_val new_query = urlencode(query_dict, doseq=True) new_url = list(url_split) new_url[3] = new_query return urlunsplit(new_url) def check_autotagging_param(self, url: URL) -> bool: """ Checks if autotagging param present inside url and if is the same as original :param url: input url to check :return: boolean if url checked OK """ result = False if 'gclid' in url.query: if url.query['gclid'] == self.gclid_val: result = True return result @staticmethod def get_allowed_error_ratio(requests) -> float: """ Gets the maximum allowed error ratio based on number of requests made so far. More requests means lower allowed error ratio. :param requests: Requests. :return: Allowed error ratio between 0.0 and 1.0. """ if requests >= 250: return 0.05 elif requests >= 125: return 0.1 elif requests >= 25: return 0.25 else: return 1.0 def log_url_exception(self, task_ctx: TaskContextInterface, url: str, exc: Exception): """ Logs an exception that was caught when retrieving the URL, but only if this was the first occurrence of this exception type. :param task_ctx: Task context. :param url: URL. :param exc: Exception instance. """ if type(exc) not in self.errors_sent: logger.exception( f'An exception of type {str(type(exc))} was caught when retrieving URL {url}. ' 'Only the first occurrence of this exception type is logged.' ) if task_ctx.error_reporter: task_ctx.error_reporter(exc) self.errors_sent.add(type(exc))