import asyncio
import logging
import socket
import ssl
import time
from collections import defaultdict
from dataclasses import dataclass
from urllib.parse import urlparse, parse_qs, urlencode, urlunsplit, SplitResult
from urllib.parse import urlsplit
import aiodns
import aiohttp
import numpy
import pandas
from aiohttp import client_exceptions
from django.conf import settings
from prometheus_client import Counter, Summary
from yarl import URL
from ppc_robot_lib.redis import create_async_redis_client
from ppc_robot_lib.steps.abstract_step import AbstractStep
from ppc_robot_lib.stores import UserSettingsStore
from ppc_robot_lib.tasks import StepPerformance, TaskContextInterface
from ppc_robot_lib.throttling.crawler_throttling import CrawlerThrottling
logger = logging.getLogger('ppc_robot_lib.url_check')
url_check_total = Counter('robot_url_check_total', 'Total URLs checked.')
url_check_seconds = Summary('robot_url_check_seconds', 'Time spent checking URLs.')
url_check_errors_total = Counter('robot_url_check_errors_total', 'Total errors ocurred when checking URLs', ['code'])
url_check_errors_total.labels('error')
USER_AGENT_BASE = (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/88.0.4324.151 Safari/537.36'
)
USER_AGENT_ROBOT_SUFFIX = '(compatible; PPCRobot/1.0; +https://ppc-robot.net/)'
URL_INDEX = 0
CODE_INDEX = 1
REDIRECT_TO_INDEX = 2
ERROR_INDEX = 3
@dataclass
class DomainCheckRecord:
urls: int = 0
checked: int = 0
errors: int = 0
MIN_RESPONSE_TIME = 0.25
[docs]
class CheckUrlsStep(AbstractStep):
"""
Performs an URL check.
Input can be either a :py:class:`pandas.DataFrame`, or :py:class:`pandas.core.groupby.DataFrameGroupBy`.
If you use a DataFrame, you have to set the ``column`` argument to a column with a valid URL. Please note
that this step does not perform any kind of deduplication, so you might end up checking a single URL
multiple times.
If you use a group-by result as the input, index with group keys is always used as the URL. The index cannot
be hierarchical, this means the group-by cannot be constructed with multiple columns for grouping.
Output table will contain 4 columns:
``url``
URL being checked.
``code``
HTTP Status code. Might be blank if the connection failed and page was not retrieved.
Either ``error`` or ``code`` will be present.
``redirect_to``
URL to which the user would be redirected, filled-in if the server has returned a ``Location`` header.
``error``
Error code if the check failed. Either ``error`` or ``code`` will be present.
**Example:**
>>> from ppc_robot_lib.steps.input import CheckUrlsStep
>>> CheckUrlsStep("urls_grouped", output_table="check_result", use_group=True)
The ``urls_grouped`` can be prepared using the
:py:class:`ppc_robot_lib.steps.transformations.group_by_column.GroupByColumnStep`::
>>> from ppc_robot_lib.steps.transformations import GroupByColumnStep
>>> GroupByColumnStep('urls', ['Url'], 'urls_grouped')
"""
def __init__(
self,
input_table,
output_table,
use_group=False,
column: str = None,
parallel_tasks: int = 4,
max_conn: int = 16,
timeout: int = 10,
hide_robot_suffix=False,
use_get_requests=False,
force_https: bool = False,
check_autotagging_params: bool = False,
):
"""
:param input_table: Input table with URLs. Can be either dataframe or
:param output_table: Output table. Must not exist.
:param use_group: Set to ``True`` if the input table is a :py:class:`pandas.core.groupby.DataFrameGroupBy`
instance. Exclusive with the ``column`` parameter.
:param column: Column with URL from the input table.
:param parallel_tasks: Maximum number of URLs to check in parallel.
:param max_conn: Maximum keep-alive connections.
:param timeout: URL download timeout.
:param force_https: check https or not
:param check_autotagging_params: check redirect params in url
"""
if not use_group and not column:
raise ValueError('You have to set column which contains URL when use_group is set to False')
elif use_group and column:
raise ValueError('You cannot use column attribute when use_group is set to True.')
self.use_group = use_group
self.column = column
self.input_table = input_table
self.output_table = output_table
self.parallel_tasks = parallel_tasks
self.max_conn = max_conn
self.timeout = timeout
self.hide_robot_suffix = hide_robot_suffix
self.use_get_requests = use_get_requests
self.force_https = force_https
self.check_autotagging_urls = check_autotagging_params
self.urls_done = 0
self.headers = {}
self.workers_running = 0
self.retry_sleep_interval = 3
self.errors = 0
self.requests = 0
self.domains: dict[str, DomainCheckRecord] = defaultdict(DomainCheckRecord)
self.errors_sent = set()
self.proxy = None
self.gclid_val: str = 'PPCRobot'
self._min_delay = MIN_RESPONSE_TIME / parallel_tasks
self._max_delay = 4 / parallel_tasks
self._current_delay = 0.5 / parallel_tasks
self._hostname_ips: dict[str, str] = {}
self._dns_resolver: aiodns.DNSResolver | None = None
@staticmethod
def build_user_agent(hide_suffix):
if not hide_suffix:
return f'{USER_AGENT_BASE} {USER_AGENT_ROBOT_SUFFIX}'
else:
return USER_AGENT_BASE
def get_proxy(self, user_id) -> str | None:
settings_store = UserSettingsStore(user_id)
use_proxy = settings_store.get('url_check_use_proxy', default=False)
if use_proxy:
return getattr(settings, 'PPC_ROBOT_HTTP_PROXY', None)
else:
return None
def execute(self, task_ctx: TaskContextInterface) -> StepPerformance:
table = task_ctx.work_set.get_table(self.input_table)
if self.use_group:
url_iter = table.groups.keys()
else:
url_iter = table[self.column]
# Set the proxy server - valid only for selected users, others wil get `proxy=None`.
if task_ctx.user_credentials and task_ctx.user_credentials.user_id:
self.proxy = self.get_proxy(task_ctx.user_credentials.user_id)
else:
self.proxy = None
# TODO: Make User-Agent and Languages configurable.
self.headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'cs,en-US;q=0.7,en;q=0.3',
'Upgrade-Insecure-Requests': '1',
'User-Agent': self.build_user_agent(self.hide_robot_suffix),
}
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
new_table = loop.run_until_complete(self.check_urls(url_iter, task_ctx))
finally:
loop.close()
task_ctx.work_set.set_table(self.output_table, new_table)
return StepPerformance(new_table, rows_out=len(new_table.index))
async def check_urls(self, url_iterable, task_ctx: TaskContextInterface):
self._dns_resolver = aiodns.DNSResolver()
try:
self.errors = 0
self.requests = 0
rows = len(url_iterable)
if 0 < rows < self.parallel_tasks:
self.parallel_tasks = rows
logger.info('Preparing to check %d URLs in %d tasks.', rows, self.parallel_tasks)
index = numpy.arange(rows)
result_frame = pandas.concat(
[
pandas.Series(name='url', index=index, dtype=str),
pandas.Series(
name='code', index=index, dtype=numpy.int16, data=numpy.zeros(rows, dtype=numpy.int16)
),
pandas.Series(name='redirect_to', index=index, dtype=object, data=numpy.nan),
pandas.Series(name='error', index=index, dtype=object, data=numpy.nan),
],
axis=1,
) # type: pandas.DataFrame
if rows > 0:
queue = asyncio.Queue(maxsize=self.parallel_tasks)
conn = aiohttp.TCPConnector(ttl_dns_cache=3600, limit=self.max_conn)
http_timeout = aiohttp.ClientTimeout(
total=self.timeout * 3,
sock_connect=self.timeout,
sock_read=self.timeout,
)
async with aiohttp.ClientSession(
connector=conn,
raise_for_status=False,
timeout=http_timeout,
) as session:
logger.info(f'Creating {self.parallel_tasks} workers')
# Create parallel_tasks workers.
workers = [
asyncio.create_task(self.check_url(session, queue, result_frame, task_ctx))
for _ in range(0, self.parallel_tasks)
]
await self.produce_urls(queue, url_iterable)
logger.info(f'Added all {rows} URLs to queue: queue size={queue.qsize()}')
await self.add_stop_elements(queue)
logger.info(f'Added stop elements, joining queue: queue size={queue.qsize()}')
# Wait until all items are processed.
await queue.join()
logger.info(f'Cancelling workers: queue size={queue.qsize()}')
# Cancel all workers.
for worker in workers:
worker.cancel()
logger.info('All workers cancelled.')
per_domain_results = [
f'{domain}: checked {record.checked} of {record.urls} URLs, found {record.errors} errors'
for domain, record in self.domains.items()
]
if not per_domain_results:
per_domain_results.append('no URLs checked')
logger.info(f'URL check complete. Per-domain check summary:\n' + '\n'.join(per_domain_results))
return result_frame
except Exception as e:
logger.exception('Error in URL checker')
raise
async def produce_urls(self, queue, url_iterable):
redis_conn = await create_async_redis_client()
try:
crawler_throttling = CrawlerThrottling(redis_conn)
# Add all URLs to the queue.
# TODO: Support for robots.txt.
for i, url in enumerate(url_iterable):
try:
parsed_url = urlparse(url)
hostname = parsed_url.hostname
except ValueError:
hostname = None
if isinstance(hostname, str) and hostname:
self.domains[hostname].urls += 1
ip_key = await self.resolve_ip_address(hostname)
# Wait between requests.
effective_delay = self.compute_effective_delay(self.parallel_tasks, self._current_delay)
actual_delay = await crawler_throttling.get_next_delay(ip_key, effective_delay)
await asyncio.sleep(actual_delay)
else:
hostname = None
await queue.put((i, url, hostname))
await redis_conn.aclose()
except Exception:
await redis_conn.aclose()
raise
@classmethod
def compute_effective_delay(cls, concurrency: int, delay: float) -> float:
"""
Computes a new delay. The formula in this method decreases the concurrency degree for slower websites
by stretching the delay.
:param concurrency:
:param delay: Current expected delay.
:return: New effective delay.
"""
# 0.5 for concurrency = 1, 1 for concurrency = 4, 2 for concurrency = 10
min_concurrency = 0.5 + (concurrency - 1) / 6
min_c_time = 4.0
min_c_delay = min_c_time / concurrency
multiplier = (concurrency / min_concurrency - 1) / (min_c_delay - MIN_RESPONSE_TIME / concurrency)
return delay * (1 + multiplier * (delay - MIN_RESPONSE_TIME / concurrency))
async def resolve_ip_address(self, hostname: str) -> str:
"""
Tries to resolve the given hostname to an IPv4 address.
If there are multiple IP addresses for the given hostname, one will be chosen at random.
In case of an error, the hostname itself will be returned.
:param hostname: Hostname.
:return: IP address or the hostname itself.
"""
if hostname in self._hostname_ips:
return self._hostname_ips[hostname]
try:
dns_response = await self._dns_resolver.gethostbyname(hostname, socket.AF_INET)
if dns_response and dns_response.addresses:
result = dns_response.addresses[0] # The addresses are returned in random order.
logger.info(f'DNS: {hostname} resolved to {result}')
else:
result = hostname
logger.info(f'DNS: {hostname} has no IPv4 address')
except aiodns.error.DNSError:
logger.warning(f'DNS: Failed to resolve {hostname}', exc_info=True)
result = hostname
self._hostname_ips[hostname] = result
return result
async def add_stop_elements(self, queue):
# Add stop element for each parallel task.
for _ in range(0, self.parallel_tasks):
await queue.put((None, None, None))
async def check_url(
self,
session: aiohttp.ClientSession,
queue: asyncio.Queue,
results: pandas.DataFrame,
task_ctx: TaskContextInterface,
):
self.workers_running += 1
if self.use_get_requests:
method = session.get
else:
method = session.head
try:
while True:
index, url, hostname = await queue.get()
try:
if index is None:
break
url_check_total.inc()
self.domains[hostname].checked += 1
results.iat[index, URL_INDEX] = url
can_retry = True
error = None
retries = 0
max_retries = 3
if self.check_autotagging_urls:
url = self.add_autotagging_param(url)
while can_retry:
error = None
retries += 1
status = None
start_time = time.monotonic()
try:
self.requests += 1
if retries > 1:
logger.info('Retry #%d for URL %s', retries, url)
with url_check_seconds.time():
async with method(
url,
headers=self.headers,
allow_redirects=True,
timeout=self.timeout,
proxy=self.proxy,
) as response: # type: aiohttp.ClientResponse
can_retry = False
status = response.status
results.iat[index, CODE_INDEX] = status
if response.history:
location = str(response.url)
else:
location = response.headers.get('Location', None)
if status >= 400:
url_check_errors_total.labels(response.status).inc()
else:
if self.check_autotagging_urls:
param_present = self.check_autotagging_param(response.url)
if not param_present:
error = 'Auto-Tagging Parameter Removed'
results.iat[index, CODE_INDEX] = 0
if self.force_https and error is None:
if response.history:
if response.url.scheme == 'https':
results.iat[index, CODE_INDEX] = response.history[0].status
else:
error = 'Redirect URL Not Secure (HTTP)'
results.iat[index, CODE_INDEX] = 0
else:
if response.url.scheme == 'https':
results.iat[index, CODE_INDEX] = response.status
else:
error = 'Not Secure (HTTP)'
results.iat[index, CODE_INDEX] = 0
if location:
results.iat[index, REDIRECT_TO_INDEX] = location
except GeneratorExit:
raise
except client_exceptions.TooManyRedirects:
error = 'Redirect Loop'
except (aiohttp.ServerTimeoutError, TimeoutError):
error = 'Timeout while fetching URL.'
except aiohttp.ServerDisconnectedError:
error = 'Server disconnected during the connection.'
except (aiohttp.ServerConnectionError, aiohttp.ClientConnectorError):
error = 'Error while connecting to the server.'
except aiohttp.ClientResponseError as client_resp_exc:
can_retry = False
error = 'Invalid response retrieved.'
self.log_url_exception(task_ctx, url, client_resp_exc)
except aiohttp.ClientPayloadError as client_payload_exc:
can_retry = False
error = 'Malformed response retrieved.'
self.log_url_exception(task_ctx, url, client_payload_exc)
except ssl.CertificateError as cert_exc:
can_retry = False
error = f'Certificate error: {str(cert_exc)}'
except aiohttp.InvalidURL:
can_retry = False
error = 'Invalid URL'
except ValueError as exc:
can_retry = False
error = str(exc)
except Exception:
can_retry = False
logger.exception('Unknown error occured.')
error = 'Unknown error ocurred.'
latency = time.monotonic() - start_time
self._current_delay = self.compute_next_delay(status, latency)
if error is not None:
url_check_errors_total.labels('error').inc()
self.domains[hostname].errors += 1
self.errors += 1
if can_retry:
# Maximum error ratio is determined by total number of requests made so far.
allowed_error_ratio = self.get_allowed_error_ratio(self.requests)
if retries >= max_retries or (self.errors / self.requests) >= allowed_error_ratio:
can_retry = False
else:
await asyncio.sleep(self.retry_sleep_interval * max(1.0, self._current_delay))
if error is not None:
results.iat[index, ERROR_INDEX] = error
self.urls_done += 1
if self.urls_done % 100 == 0:
logger.info('%d/%d URLs checked.', self.urls_done, len(results.index))
finally:
queue.task_done()
except GeneratorExit:
raise
except Exception:
logger.exception('Internal PPC Robot error.')
finally:
self.workers_running -= 1
# If I am the last worker and the queue is still not empty, fill the remaining URLs with errors.
if self.workers_running < 1 and queue.qsize() > 0:
logger.warning('Cleaning up the queue!')
while True:
index, url, hostname = await queue.get()
try:
if index is None:
break
results.iat[index, URL_INDEX] = url
results.iat[index, ERROR_INDEX] = 'Internal PPC Robot error.'
finally:
queue.task_done()
def compute_next_delay(self, status: int | None, latency: float) -> float:
target_delay = latency / self.parallel_tasks
next_delay = (self._current_delay + target_delay) / 2
# Non-successful responses are not allowed to decrease the delay.
if status is None or status < 200 or status >= 300:
next_delay = max(next_delay, self._current_delay)
if next_delay < self._min_delay:
next_delay = self._min_delay
elif next_delay > self._max_delay:
next_delay = self._max_delay
return next_delay
def add_autotagging_param(self, url: str) -> str:
"""
Adds gclid parameter to given url address.
:param url: input url
:return: extended url by autotagging parameter
"""
url_split: SplitResult = urlsplit(url)
query = url_split[3]
query_dict = parse_qs(query)
query_dict['gclid'] = self.gclid_val
new_query = urlencode(query_dict, doseq=True)
new_url = list(url_split)
new_url[3] = new_query
return urlunsplit(new_url)
def check_autotagging_param(self, url: URL) -> bool:
"""
Checks if autotagging param present inside url and if is the same as original
:param url: input url to check
:return: boolean if url checked OK
"""
result = False
if 'gclid' in url.query:
if url.query['gclid'] == self.gclid_val:
result = True
return result
@staticmethod
def get_allowed_error_ratio(requests) -> float:
"""
Gets the maximum allowed error ratio based on number of requests made so far.
More requests means lower allowed error ratio.
:param requests: Requests.
:return: Allowed error ratio between 0.0 and 1.0.
"""
if requests >= 250:
return 0.05
elif requests >= 125:
return 0.1
elif requests >= 25:
return 0.25
else:
return 1.0
def log_url_exception(self, task_ctx: TaskContextInterface, url: str, exc: Exception):
"""
Logs an exception that was caught when retrieving the URL, but only if this was the first occurrence
of this exception type.
:param task_ctx: Task context.
:param url: URL.
:param exc: Exception instance.
"""
if type(exc) not in self.errors_sent:
logger.exception(
f'An exception of type {str(type(exc))} was caught when retrieving URL {url}. '
'Only the first occurrence of this exception type is logged.'
)
if task_ctx.error_reporter:
task_ctx.error_reporter(exc)
self.errors_sent.add(type(exc))