Source code for ppc_robot_lib.sklik.client

import logging
import random
import re
import time
from json import JSONDecodeError
from typing import Any

import requests
from prometheus_client import Counter, Summary

from ppc_robot_lib.sklik.exceptions import SklikCallError, SklikError, SklikRateLimitError
from ppc_robot_lib.sklik.session import SklikSession
from ppc_robot_lib.utils import http
from ppc_robot_lib.utils.http import is_rate_limit_error, is_server_error
from ppc_robot_lib.utils.json import get_encoder


DEFAULT_ENDPOINT = 'https://api.sklik.cz/drak/json/v5/'
logger = logging.getLogger('ppc_robot_lib.sklik')

sklik_request_seconds = Summary('robot_sklik_request_seconds', 'Time spent with Sklik API Calls.', ['operation'])
sklik_request_errors_total = Counter(
    'robot_sklik_request_errors_total', 'Total errors ocurred when calling Sklik API.', ['operation']
)


[docs] class SklikClient: """ Client for accessing the Sklik API. Performs authentication, session handling, and provides a generic :py:meth:`call` method for calling any other operations in the API. This method can automatically inject session and user IDs as the first parameter. """ def __init__( self, user_id: int = None, endpoint: str = DEFAULT_ENDPOINT, max_retries: int = 5, session: SklikSession | None = None, ): """ :param user_id: ID of the Sklik account. :param endpoint: Sklik API endpoint. Currently points to ``https://api.sklik.cz/drak/json/v5/``. :param max_retries: Maximum number of retries """ self._session = session if session is not None else SklikSession() self._user_id = user_id self._endpoint = endpoint self._batch_limits = None self._max_retries = max_retries @property def user_id(self) -> int: """ :return: Current user ID. """ return self._user_id @user_id.setter def user_id(self, value: int | None) -> None: """ :param value: New value for the user ID. """ self._user_id = value @property def logged_in(self): """ :return: ``True`` if the client is logged in. """ return self._session.has_value()
[docs] def get_batch_limit(self, operation: str, default_limit: int = 100) -> int: """ Gets the limit of report rows/request entities for the given operation. This function retrieves the result of ``api.limits`` operation, caches it, and tries to look up the limit in the following order: Given the operation in the format ``<entity>.<method>`` (e.g. ``keywords.update``): 1. ``<entity>.<method>`` (``keywords.update``) 2. ``global.<method>`` (``global.update``) 3. Return value of the ``default_limit`` parameter. :param operation: Name of the operation, e.g. ``keywords.create``. :param default_limit: Default value to use when the Sklik API does not return any limit for the operation. :return: Limit for the given operation. """ if self._batch_limits is None: self._batch_limits = self.load_limits() if operation in self._batch_limits: return self._batch_limits[operation] elif self._global_op(operation) in self._batch_limits: return self._batch_limits[self._global_op(operation)] else: return default_limit
[docs] def call(self, operation, *args, **kwargs) -> dict[str, Any]: """ Performs the given API call and returns its result. If the ``authenticated`` kwarg is set to ``True`` (or not given at all), this function passes the object with ``session`` (and ``userId``, if it was set during the creation) as the first argument. You have to omit the argument when performing call. Example:: >>> client = SklikClient(user_id=10) >>> client.login('<api_token>') >>> client.call('client.stats', {'dateFrom': '2020-01-01', 'dateTo': '2020-01-02', 'granularity': 'daily'}) { 'status': 200, 'statusMessage': 'OK', 'report': [ { 'date': '20200101T00:00:00+0000', 'impressions': 0, 'clicks': 0, 'ctr': 0, 'cpc': 0, 'price': 0, 'avgPosition': 0.0, 'conversions': 0, 'conversionRatio': 0, 'conversionAvgPrice': 0, 'conversionValue': 0, 'conversionAvgValue': 0, 'conversionValueRatio': 0, 'transactions': 0, 'transactionAvgPrice': 0, 'transactionAvgValue': 0, 'transactionAvgCount': 0 }, { 'date': '20200102T00:00:00+0000', 'impressions': 0, 'clicks': 0, 'ctr': 0, 'cpc': 0, 'price': 0, 'avgPosition': 0.0, 'conversions': 0, 'conversionRatio': 0, 'conversionAvgPrice': 0, 'conversionValue': 0, 'conversionAvgValue': 0, 'conversionValueRatio': 0, 'transactions': 0, 'transactionAvgPrice': 0, 'transactionAvgValue': 0, 'transactionAvgCount': 0 } ] } :param operation: Name of the Sklik API operation to call, e.g. ``keywords.create``. :param args: Arguments of the operation. If ``authenticated`` is ``True`` (default), start with the second argument. :keyword authenticated: If set to ``True`` (default value), session ID will be passed as the first parameter of the call. :keyword include_user_id: If set to ``True`` (default value), user ID will be passed in the first parameter of the call. :return: Response from the API as a dictionary. :raises SklikCallError: if the operation did not succeed. """ authenticated = kwargs.get('authenticated', True) include_user_id = kwargs.get('include_user_id', True) if authenticated: arguments = [self._create_user_struct(include_user_id)] + list(args) else: arguments = args try: response_data = self._execute_request(operation, f'{self._endpoint}{operation}', arguments) if response_data and 'session' in response_data: self._session.value = response_data['session'] del response_data['session'] return response_data except SklikCallError as exc: SklikError.raise_from_sklik_call_error(exc)
[docs] def login(self, login_key: str) -> bool: """ Logs into to the Sklik API and stores the session ID. :param login_key: Sklik API token. :return: Always ``True``. If the login is not successful, an exception is raised. :raises SklikCallError: if the login did not succeed. """ self.call('client.loginByToken', login_key, authenticated=False) return True
[docs] def logout(self) -> bool: """ Logs out of the Sklik API :return: Always ``True``. If the logout is not successful, an exception is raised. :raises SklikCallError: if the logout did not succeed. """ self.call('client.logout', include_user_id=False) self._session.reset() return True
[docs] def load_limits(self) -> dict[str, int]: """ Loads the batch operation limits using the ``api.limits`` call. Requires the client to be authenticated, because the ``api.limits`` call requires user authentication. """ limits_response = self.call('api.limits', include_user_id=False) return {item['name']: item['limit'] for item in limits_response['batchCallLimits']}
def _create_user_struct(self, include_user_id: bool = True) -> dict[str, str | int]: """ Creates a structure with session and user ID. :param include_user_id: Include the user ID in the object? :return: Dictionary: ``{'session': <session: str>, 'userId': <user_id: int>}``. """ struct = {'session': self._session.value} if self._user_id and include_user_id: struct['userId'] = self._user_id return struct def _execute_request(self, operation: str, url: str, payload: list[Any]) -> dict[str, Any]: """ Executes an API request. Handles automatic retries in case of an error. :param operation: Name of the operation. :param url: Target URL. :param payload: List with operation parameters. :return: Request response. """ success = False retries = 0 json_encoder = get_encoder() while not success: response_data = None try: with sklik_request_seconds.labels(operation).time(): response = requests.post( url, data=json_encoder.encode(payload), headers={'Content-Type': 'application/json'} ) response.raise_for_status() response_data = response.json() # error treatment according to 'status' parameter if 'status' in response_data: status_code = self._get_status_code(response_data) else: status_code = 200 if ( is_server_error(status_code) or self._is_http_client_error(status_code) or self._is_user_serviced(status_code) ): raise SklikCallError(status_code, self._get_message(response_data), response_data) elif is_rate_limit_error(status_code): raise SklikRateLimitError(status_code, self._get_message(response_data), response_data) success = True return response_data # error treatment according to API exceptions except JSONDecodeError as exc: sklik_request_errors_total.labels(operation).inc() # JSON errors generally means that the remote service is not available. Treat is as a 503 self._handle_request_error(retries, exc, status_code=503) except (requests.Timeout, requests.ConnectionError, ConnectionError, TimeoutError) as exc: sklik_request_errors_total.labels(operation).inc() # Timeout should be considered equivalent to 503 self._handle_request_error(retries, exc, status_code=503) except requests.RequestException as exc: sklik_request_errors_total.labels(operation).inc() error_message = None sleep_time = None # Extract the status code, response data and error message. if exc.response is not None: status_code = exc.response.status_code if is_rate_limit_error(status_code): # Use increased wait time for HTTP Rate limiting errors: sleep_time = random.uniform(2, 4) * (2**retries) # noqa: S311, DUO102 elif not http.is_gateway_error(status_code): try: response_data = exc.response.json() error_message = self._get_message(response_data, 'no error specified') except JSONDecodeError: logger.exception( 'Unable to parse JSON from response with HTTP status code %d.', status_code ) else: # Errors without response are usually connection-related errors. # These errors should be considered equivalent to 503. status_code = 503 self._handle_request_error(retries, exc, status_code, error_message, response_data, sleep_time) except SklikRateLimitError as exc: sklik_request_errors_total.labels(operation).inc() # Randomize the wait time to prevent thundering herd problem. retry_after = exc.retry_after * random.uniform(1, 1.25) # noqa: S311, DUO102 if exc.message: search_string = re.search(r'wait ([0-9]{1,3})\[s\]', exc.message) retry_after = int(search_string.group(1)) if search_string else retry_after self._handle_request_error(retries, exc, sleep_time=retry_after) except SklikCallError as exc: sklik_request_errors_total.labels(operation).inc() self._handle_request_error(retries, exc) finally: retries += 1 def _handle_request_error( self, retries, error=None, status_code=None, error_message=None, response_data=None, sleep_time=None ): """ :param retries: No of repeating :param error: Exception object :param status_code: e.g. 415 (error.code) :param error_message: e.g. Too many requests. Has to wait 54[s]. For check limits call 'api.limits' error.response.json()['statusMessage']) :param response_data: JSON format data (error.response.json()) :param sleep_time: time to wait :return: """ # if error_code (status_code) missing it is excepted to be merged in error object if status_code is None and isinstance(error, SklikCallError): status_code = error.code if retries > self._max_retries or not self._should_retry(status_code): if error and isinstance(error, SklikCallError): raise error else: raise SklikCallError( status_code, error_message if error_message else str(error), response_data ) from error else: if sleep_time is None: sleep_time = random.random() * (2**retries) # noqa: S311, DUO102 logger.info('Going to sleep for %.3f before retrying request.', sleep_time) time.sleep(sleep_time) @classmethod def _should_retry(cls, code: int) -> bool: """ Checks if the request can be retried. The request should be retried if the code is 3xx, 5xx, 415, or 429. For 415 and 429, the statusMessage field in the response should be checked for wait time. In case of 4xx and other codes, the request should not be repeated. :return: True if the request that returned the given code should be repeated, False otherwise. """ return is_rate_limit_error(code) or is_server_error(code) or cls._is_user_serviced(code) @staticmethod def _is_http_client_error(code: int) -> bool: """ Checks if the error is an unrecoverable HTTP client error. Rate limiting errors are not considered "unrecoverable", because the same request can be retried later. :param code: Status code. :return: ``True`` if it is an 4xx code indicating that the same request should not be repeated. """ return 400 <= code < 500 and code != 415 and code != 429 @staticmethod def _is_user_serviced(code: int) -> bool: """ Checks if the given status code indicates the "user is serviced" status (whatever that means for Sklik). :param code: Status code. :return: ``True`` if the code indicates "user is serviced". """ return code == 301 @staticmethod def _get_status_code(response_data: dict) -> int: """ Extract status code from the API response. :param response_data: Documented {'status': 400, 'statusMessage': 'Server error!'} Non-documented {'status': 'error', 'message': 'Server error!', 'code': 500} :return: Status code. """ if 'code' in response_data and response_data['status'] == 'error': status = response_data['code'] else: status = response_data['status'] return int(status) if isinstance(status, int) or (isinstance(status, str) and status.isnumeric()) else 0 @staticmethod def _get_message(response_data: dict[str, Any], default: str = '') -> str: """ Extracts status message from the API response. :param response_data: API response. :param default: Default message. :return: Status message from the response. """ if response_data and 'statusMessage' in response_data: return response_data['statusMessage'] else: return default @classmethod def _global_op(cls, operation: str): """ Gets the name of the global limit for given operation. :param operation: Full name of the operation: ``<entity>.<method>`` :return: Global operation: ``global.<method>``. """ parts = operation.split('.', 1) if len(parts) > 1: return 'global.' + parts[1] else: return parts[0]