import logging
import random
import re
import time
from json import JSONDecodeError
from typing import Any
import requests
from prometheus_client import Counter, Summary
from ppc_robot_lib.sklik.exceptions import SklikCallError, SklikError, SklikRateLimitError
from ppc_robot_lib.sklik.session import SklikSession
from ppc_robot_lib.utils import http
from ppc_robot_lib.utils.http import is_rate_limit_error, is_server_error
from ppc_robot_lib.utils.json import get_encoder
DEFAULT_ENDPOINT = 'https://api.sklik.cz/drak/json/v5/'
logger = logging.getLogger('ppc_robot_lib.sklik')
sklik_request_seconds = Summary('robot_sklik_request_seconds', 'Time spent with Sklik API Calls.', ['operation'])
sklik_request_errors_total = Counter(
'robot_sklik_request_errors_total', 'Total errors ocurred when calling Sklik API.', ['operation']
)
[docs]
class SklikClient:
"""
Client for accessing the Sklik API.
Performs authentication, session handling, and provides a generic :py:meth:`call` method for calling
any other operations in the API. This method can automatically inject session and user IDs as the first parameter.
"""
def __init__(
self,
user_id: int = None,
endpoint: str = DEFAULT_ENDPOINT,
max_retries: int = 5,
session: SklikSession | None = None,
):
"""
:param user_id: ID of the Sklik account.
:param endpoint: Sklik API endpoint. Currently points to ``https://api.sklik.cz/drak/json/v5/``.
:param max_retries: Maximum number of retries
"""
self._session = session if session is not None else SklikSession()
self._user_id = user_id
self._endpoint = endpoint
self._batch_limits = None
self._max_retries = max_retries
@property
def user_id(self) -> int:
"""
:return: Current user ID.
"""
return self._user_id
@user_id.setter
def user_id(self, value: int | None) -> None:
"""
:param value: New value for the user ID.
"""
self._user_id = value
@property
def logged_in(self):
"""
:return: ``True`` if the client is logged in.
"""
return self._session.has_value()
[docs]
def get_batch_limit(self, operation: str, default_limit: int = 100) -> int:
"""
Gets the limit of report rows/request entities for the given operation.
This function retrieves the result of ``api.limits`` operation, caches it, and tries to look up the limit
in the following order:
Given the operation in the format ``<entity>.<method>`` (e.g. ``keywords.update``):
1. ``<entity>.<method>`` (``keywords.update``)
2. ``global.<method>`` (``global.update``)
3. Return value of the ``default_limit`` parameter.
:param operation: Name of the operation, e.g. ``keywords.create``.
:param default_limit: Default value to use when the Sklik API does not return any limit for the operation.
:return: Limit for the given operation.
"""
if self._batch_limits is None:
self._batch_limits = self.load_limits()
if operation in self._batch_limits:
return self._batch_limits[operation]
elif self._global_op(operation) in self._batch_limits:
return self._batch_limits[self._global_op(operation)]
else:
return default_limit
[docs]
def call(self, operation, *args, **kwargs) -> dict[str, Any]:
"""
Performs the given API call and returns its result.
If the ``authenticated`` kwarg is set to ``True`` (or not given at all), this function passes the
object with ``session`` (and ``userId``, if it was set during the creation) as the first argument. You have
to omit the argument when performing call.
Example::
>>> client = SklikClient(user_id=10)
>>> client.login('<api_token>')
>>> client.call('client.stats', {'dateFrom': '2020-01-01', 'dateTo': '2020-01-02', 'granularity': 'daily'})
{
'status': 200,
'statusMessage': 'OK',
'report': [
{
'date': '20200101T00:00:00+0000',
'impressions': 0,
'clicks': 0,
'ctr': 0,
'cpc': 0,
'price': 0,
'avgPosition': 0.0,
'conversions': 0,
'conversionRatio': 0,
'conversionAvgPrice': 0,
'conversionValue': 0,
'conversionAvgValue': 0,
'conversionValueRatio': 0,
'transactions': 0,
'transactionAvgPrice': 0,
'transactionAvgValue': 0,
'transactionAvgCount': 0
},
{
'date': '20200102T00:00:00+0000',
'impressions': 0,
'clicks': 0,
'ctr': 0,
'cpc': 0,
'price': 0,
'avgPosition': 0.0,
'conversions': 0,
'conversionRatio': 0,
'conversionAvgPrice': 0,
'conversionValue': 0,
'conversionAvgValue': 0,
'conversionValueRatio': 0,
'transactions': 0,
'transactionAvgPrice': 0,
'transactionAvgValue': 0,
'transactionAvgCount': 0
}
]
}
:param operation: Name of the Sklik API operation to call, e.g. ``keywords.create``.
:param args: Arguments of the operation. If ``authenticated`` is ``True`` (default), start with the second
argument.
:keyword authenticated: If set to ``True`` (default value), session ID will be passed as the first parameter
of the call.
:keyword include_user_id: If set to ``True`` (default value), user ID will be passed in the first parameter
of the call.
:return: Response from the API as a dictionary.
:raises SklikCallError: if the operation did not succeed.
"""
authenticated = kwargs.get('authenticated', True)
include_user_id = kwargs.get('include_user_id', True)
if authenticated:
arguments = [self._create_user_struct(include_user_id)] + list(args)
else:
arguments = args
try:
response_data = self._execute_request(operation, f'{self._endpoint}{operation}', arguments)
if response_data and 'session' in response_data:
self._session.value = response_data['session']
del response_data['session']
return response_data
except SklikCallError as exc:
SklikError.raise_from_sklik_call_error(exc)
[docs]
def login(self, login_key: str) -> bool:
"""
Logs into to the Sklik API and stores the session ID.
:param login_key: Sklik API token.
:return: Always ``True``. If the login is not successful, an exception is raised.
:raises SklikCallError: if the login did not succeed.
"""
self.call('client.loginByToken', login_key, authenticated=False)
return True
[docs]
def logout(self) -> bool:
"""
Logs out of the Sklik API
:return: Always ``True``. If the logout is not successful, an exception is raised.
:raises SklikCallError: if the logout did not succeed.
"""
self.call('client.logout', include_user_id=False)
self._session.reset()
return True
[docs]
def load_limits(self) -> dict[str, int]:
"""
Loads the batch operation limits using the ``api.limits`` call. Requires the client to be authenticated,
because the ``api.limits`` call requires user authentication.
"""
limits_response = self.call('api.limits', include_user_id=False)
return {item['name']: item['limit'] for item in limits_response['batchCallLimits']}
def _create_user_struct(self, include_user_id: bool = True) -> dict[str, str | int]:
"""
Creates a structure with session and user ID.
:param include_user_id: Include the user ID in the object?
:return: Dictionary: ``{'session': <session: str>, 'userId': <user_id: int>}``.
"""
struct = {'session': self._session.value}
if self._user_id and include_user_id:
struct['userId'] = self._user_id
return struct
def _execute_request(self, operation: str, url: str, payload: list[Any]) -> dict[str, Any]:
"""
Executes an API request. Handles automatic retries in case of an error.
:param operation: Name of the operation.
:param url: Target URL.
:param payload: List with operation parameters.
:return: Request response.
"""
success = False
retries = 0
json_encoder = get_encoder()
while not success:
response_data = None
try:
with sklik_request_seconds.labels(operation).time():
response = requests.post(
url, data=json_encoder.encode(payload), headers={'Content-Type': 'application/json'}
)
response.raise_for_status()
response_data = response.json()
# error treatment according to 'status' parameter
if 'status' in response_data:
status_code = self._get_status_code(response_data)
else:
status_code = 200
if (
is_server_error(status_code)
or self._is_http_client_error(status_code)
or self._is_user_serviced(status_code)
):
raise SklikCallError(status_code, self._get_message(response_data), response_data)
elif is_rate_limit_error(status_code):
raise SklikRateLimitError(status_code, self._get_message(response_data), response_data)
success = True
return response_data
# error treatment according to API exceptions
except JSONDecodeError as exc:
sklik_request_errors_total.labels(operation).inc()
# JSON errors generally means that the remote service is not available. Treat is as a 503
self._handle_request_error(retries, exc, status_code=503)
except (requests.Timeout, requests.ConnectionError, ConnectionError, TimeoutError) as exc:
sklik_request_errors_total.labels(operation).inc()
# Timeout should be considered equivalent to 503
self._handle_request_error(retries, exc, status_code=503)
except requests.RequestException as exc:
sklik_request_errors_total.labels(operation).inc()
error_message = None
sleep_time = None
# Extract the status code, response data and error message.
if exc.response is not None:
status_code = exc.response.status_code
if is_rate_limit_error(status_code):
# Use increased wait time for HTTP Rate limiting errors:
sleep_time = random.uniform(2, 4) * (2**retries) # noqa: S311, DUO102
elif not http.is_gateway_error(status_code):
try:
response_data = exc.response.json()
error_message = self._get_message(response_data, 'no error specified')
except JSONDecodeError:
logger.exception(
'Unable to parse JSON from response with HTTP status code %d.', status_code
)
else:
# Errors without response are usually connection-related errors.
# These errors should be considered equivalent to 503.
status_code = 503
self._handle_request_error(retries, exc, status_code, error_message, response_data, sleep_time)
except SklikRateLimitError as exc:
sklik_request_errors_total.labels(operation).inc()
# Randomize the wait time to prevent thundering herd problem.
retry_after = exc.retry_after * random.uniform(1, 1.25) # noqa: S311, DUO102
if exc.message:
search_string = re.search(r'wait ([0-9]{1,3})\[s\]', exc.message)
retry_after = int(search_string.group(1)) if search_string else retry_after
self._handle_request_error(retries, exc, sleep_time=retry_after)
except SklikCallError as exc:
sklik_request_errors_total.labels(operation).inc()
self._handle_request_error(retries, exc)
finally:
retries += 1
def _handle_request_error(
self, retries, error=None, status_code=None, error_message=None, response_data=None, sleep_time=None
):
"""
:param retries: No of repeating
:param error: Exception object
:param status_code: e.g. 415 (error.code)
:param error_message: e.g. Too many requests. Has to wait 54[s]. For check limits call 'api.limits'
error.response.json()['statusMessage'])
:param response_data: JSON format data (error.response.json())
:param sleep_time: time to wait
:return:
"""
# if error_code (status_code) missing it is excepted to be merged in error object
if status_code is None and isinstance(error, SklikCallError):
status_code = error.code
if retries > self._max_retries or not self._should_retry(status_code):
if error and isinstance(error, SklikCallError):
raise error
else:
raise SklikCallError(
status_code, error_message if error_message else str(error), response_data
) from error
else:
if sleep_time is None:
sleep_time = random.random() * (2**retries) # noqa: S311, DUO102
logger.info('Going to sleep for %.3f before retrying request.', sleep_time)
time.sleep(sleep_time)
@classmethod
def _should_retry(cls, code: int) -> bool:
"""
Checks if the request can be retried.
The request should be retried if the code is 3xx, 5xx, 415, or 429.
For 415 and 429, the statusMessage field in the response should be checked for wait time.
In case of 4xx and other codes, the request should not be repeated.
:return: True if the request that returned the given code should be repeated, False otherwise.
"""
return is_rate_limit_error(code) or is_server_error(code) or cls._is_user_serviced(code)
@staticmethod
def _is_http_client_error(code: int) -> bool:
"""
Checks if the error is an unrecoverable HTTP client error. Rate limiting errors are not considered
"unrecoverable", because the same request can be retried later.
:param code: Status code.
:return: ``True`` if it is an 4xx code indicating that the same request should not be repeated.
"""
return 400 <= code < 500 and code != 415 and code != 429
@staticmethod
def _is_user_serviced(code: int) -> bool:
"""
Checks if the given status code indicates the "user is serviced" status (whatever that means for Sklik).
:param code: Status code.
:return: ``True`` if the code indicates "user is serviced".
"""
return code == 301
@staticmethod
def _get_status_code(response_data: dict) -> int:
"""
Extract status code from the API response.
:param response_data: Documented {'status': 400, 'statusMessage': 'Server error!'}
Non-documented {'status': 'error', 'message': 'Server error!', 'code': 500}
:return: Status code.
"""
if 'code' in response_data and response_data['status'] == 'error':
status = response_data['code']
else:
status = response_data['status']
return int(status) if isinstance(status, int) or (isinstance(status, str) and status.isnumeric()) else 0
@staticmethod
def _get_message(response_data: dict[str, Any], default: str = '') -> str:
"""
Extracts status message from the API response.
:param response_data: API response.
:param default: Default message.
:return: Status message from the response.
"""
if response_data and 'statusMessage' in response_data:
return response_data['statusMessage']
else:
return default
@classmethod
def _global_op(cls, operation: str):
"""
Gets the name of the global limit for given operation.
:param operation: Full name of the operation: ``<entity>.<method>``
:return: Global operation: ``global.<method>``.
"""
parts = operation.split('.', 1)
if len(parts) > 1:
return 'global.' + parts[1]
else:
return parts[0]