Source code for ppc_robot_lib.output.google_spreadsheet

import itertools
import random
from collections import defaultdict, namedtuple
from copy import copy
from typing import Any

import babel
from dataclasses import dataclass, field
from googleapiclient.errors import HttpError

from ppc_robot_lib.google_drive import GoogleDriveConnector
from ppc_robot_lib.google_drive.connector import is_not_trashed
from ppc_robot_lib.google_drive.exceptions import GoogleDriveError
from ppc_robot_lib.google_drive.folder import get_target_folder
from ppc_robot_lib.google_spreadsheet import GoogleSpreadsheetConnector
from ppc_robot_lib.google_spreadsheet.exceptions import GoogleSpreadsheetError
from ppc_robot_lib.output.abstract_adapter import AbstractOutputAdapter
from ppc_robot_lib.output.currency import OfficeCurrencyFormat
from ppc_robot_lib.output.google_sheet.charts import ChartDefinitionFactory
from ppc_robot_lib.output.iter import SheetDataAdapter
from ppc_robot_lib.output.resolver import ColumnIndexResolver
from ppc_robot_lib.output.types import OutputContext, SheetData
from ppc_robot_lib.output.utils import expand_column_references, get_text_width
from ppc_robot_lib.tasks import task_context
from ppc_robot_lib.utils.changes import ItemAddition, ItemDeletion, compute_list_changes
from ppc_robot_lib.utils.dictionary import get_dict_value_paths
from ppc_robot_lib.utils.types import (
    BoolConditionOperator,
    BoolConditionRule,
    CellStyle,
    Color,
    Column,
    FormatType,
    FormattedText,
    FormattedTextNode,
    GradientPoint,
    GradientRule,
    Image,
    Sparkline,
    SparklineOptions,
    SparklineType,
    Format,
)

FormulaSeparators = namedtuple('FormulaSeparators', ['parameters', 'cells', 'rows'])

LETTER_OFFSET = ord('@')

# Mapping of separators by decimal point symbol.
LOCALIZED_FORMULA_SEPARATORS: dict[str, FormulaSeparators] = {
    '.': FormulaSeparators(parameters=',', cells=',', rows=';'),
    ',': FormulaSeparators(parameters=';', cells='\\', rows=';'),
}


def rowcol_to_a1(row, col):
    """Translates a row and column cell address to A1 notation.
    :param row: The row of the cell to be converted.
                Rows start at index 1.
    :param col: The column of the cell to be converted.
                Columns start at index 1.
    :returns: a string containing the cell's coordinates in A1 notation.
    Example:
    >>> rowcol_to_a1(1, 1)
    A1
    """
    col = int(col)

    div = col
    column_label = ''

    while div:
        (div, mod) = divmod(div, 26)
        if mod == 0:
            mod = 26
            div -= 1
        column_label = chr(mod + LETTER_OFFSET) + column_label

    label = f'{column_label}{int(row)}'

    return label


@dataclass()
class FilterView:
    id: int | None
    start_row_index: int | None
    end_row_index: int | None
    start_column_index: int | None
    end_column_index: int | None


@dataclass()
class RetrievedSheet:
    id: int
    cond_rule_count: int
    col_count: int
    row_count: int
    basic_filter: FilterView = None
    chart_ids: list[int] = field(default_factory=list)
    frozen_rows: int | None = None
    frozen_columns: int | None = None


@dataclass()
class SpreadsheetProperties:
    title: str = None
    locale_code: str = None
    sheets: dict[str, RetrievedSheet] = field(default_factory=dict)


@dataclass()
class SheetConfiguration:
    data: SheetData
    previous_name: str
    previous_state: dict[str, Any]
    use_default_sheet: bool


class ColumnFormatRun:
    def __init__(self, offset):
        self.offset = offset
        self.cells = []

    def add_format_run(self, index, format_run):
        idx = index - self.offset
        if idx > len(self.cells):
            self.cells.extend(itertools.repeat(None, idx - len(self.cells)))
        self.cells.append(format_run)


class CellFormats:
    def __init__(self, offset):
        self.offset = offset
        self.cells: list[Format | None] = []

    def add_format(self, index, fmt: Format):
        idx = index - self.offset
        if idx > len(self.cells):
            self.cells.extend(itertools.repeat(None, idx - len(self.cells)))
        self.cells.append(fmt)


class RowOutputTransformation(list):
    # noinspection PyMissingConstructor
    def __init__(
        self,
        sheet_data: SheetDataAdapter,
        locale,
        formula_separators,
        row_columns: list[Column] = None,
        currency_column_index: int = None,
        header_rows: int = 0,
    ):
        self._data = sheet_data
        self._locale = locale
        self._formula_separators = formula_separators
        self._row_columns = row_columns if row_columns is not None else []
        self._currency_column_index = currency_column_index
        self._header_rows = header_rows

        self._column_widths: dict[int, int] = defaultdict(int)
        self._column_format_runs: dict[int, ColumnFormatRun] = {}
        self._cell_formats: dict[int, CellFormats] = {}
        self._text_widths: dict[int, int] = defaultdict(int)

        self._currency_columns = self.get_currency_columms()
        self._max_width_columns = self.get_columns_with_max_width()

    def get_currency_columms(self):
        currency_columns = {}
        for index, column in enumerate(self._row_columns):
            if column.cell_format and column.cell_format.format_type == FormatType.CURRENCY:
                currency_columns[index] = column.cell_format
        return currency_columns

    def get_columns_with_max_width(self):
        columns_with_max_width = set()
        for index, column in enumerate(self._row_columns):
            if column.max_width:
                columns_with_max_width.add(index)
        return columns_with_max_width

    @property
    def column_widths(self) -> dict[int, int]:
        return self._column_widths

    @property
    def column_cell_formats(self) -> dict[int, CellFormats]:
        return self._cell_formats

    @property
    def column_format_runs(self) -> dict[int, ColumnFormatRun]:
        return self._column_format_runs

    @property
    def column_text_widths(self) -> dict[int, int]:
        return self._text_widths

    def __len__(self):
        return len(self._data)

    def __iter__(self):
        return self._iter_generator()

    def _iter_generator(self):
        _locale = self._locale
        _formula_separators = self._formula_separators

        for row_idx, row in enumerate(self._data):
            if self._currency_column_index:
                row_currency = row[self._currency_column_index]
            else:
                row_currency = None

            for col_idx, value in enumerate(row):
                has_object = False
                if isinstance(value, FormattedText):
                    row[col_idx] = str(value)
                    if col_idx not in self._column_format_runs:
                        self._column_format_runs[col_idx] = ColumnFormatRun(row_idx)
                    self._column_format_runs[col_idx].add_format_run(
                        row_idx, GoogleSpreadsheetAdapter.create_text_format_runs(value)
                    )
                    has_object = True
                elif isinstance(value, Image):
                    quoted_url = value.url.replace('"', '" & CHAR(34) & "')
                    row[col_idx] = f'=HYPERLINK("{quoted_url}"; IMAGE("{quoted_url}"))'
                    has_object = True
                elif isinstance(value, Sparkline):
                    formula, width = GoogleSpreadsheetAdapter.create_sparkline_formula(
                        value, _locale, _formula_separators
                    )
                    row[col_idx] = formula
                    if width > self._column_widths[col_idx]:
                        self._column_widths[col_idx] = width
                    has_object = True
                elif isinstance(value, str):
                    row[col_idx] = f"'{value}"
                elif row_currency is not None and col_idx in self._currency_columns:
                    if col_idx not in self._cell_formats:
                        self._cell_formats[col_idx] = CellFormats(row_idx)

                    cell_format = copy(self._currency_columns[col_idx])
                    cell_format.pattern = OfficeCurrencyFormat.get_format_string(row_currency, self._locale)

                    self._cell_formats[col_idx].add_format(row_idx, cell_format)

                if col_idx in self._max_width_columns and not has_object and row_idx >= self._header_rows:
                    text_width = get_text_width(value, self._row_columns[col_idx].cell_format)
                    if text_width > self._text_widths[col_idx]:
                        self._text_widths[col_idx] = text_width

            yield row


[docs] class GoogleSpreadsheetAdapter(AbstractOutputAdapter): """ Writes data to Google Sheets using Google Sheets API: https://developers.google.com/sheets/ """ CONDITION_TYPES = { BoolConditionOperator.EQ: 'NUMBER_EQ', BoolConditionOperator.NEQ: 'NUMBER_NOT_EQ', BoolConditionOperator.LT: 'NUMBER_LESS', BoolConditionOperator.LTE: 'NUMBER_LESS_THAN_EQ', BoolConditionOperator.GT: 'NUMBER_GREATER', BoolConditionOperator.GTE: 'NUMBER_GREATER_THAN_EQ', BoolConditionOperator.BETWEEN: 'NUMBER_BETWEEN', BoolConditionOperator.NOT_BETWEEN: 'NUMBER_NOT_BETWEEN', BoolConditionOperator.CUSTOM_FORMULA: 'CUSTOM_FORMULA', } SPARKLINE_TYPES = { SparklineType.LINE: 'line', SparklineType.COLUMN: 'column', } MAX_SHEET_ID = 2**31 CHART_X_MARGIN = 5 CHART_Y_MARGIN = 10 AVG_CHAR_WIDTH = 7.6 """Average character width in pixels.""" def __init__(self, output_ctx: OutputContext): super().__init__(output_ctx) self._connector: GoogleSpreadsheetConnector = None # Current and previous state. self._sheets_state: dict[str, dict[str, Any]] = {} self._previous_sheets: dict[str, dict[str, Any]] = {} self._unused_sheets = set() # Resolvers self._column_resolvers: dict[int, ColumnIndexResolver] = {} self._sheet_id_map: dict[int, int] = {} # Map of internal sheet IDs to IDs in Google Sheets self._sheet_data_map: dict[int, SheetData] = {} # Current spreadsheet. self._spreadsheet_id: str = None self._spreadsheet_properties: SpreadsheetProperties = None self._locale = None self._retrieved_sheets: dict[str, RetrievedSheet] = {} self._sheet_id_set: set[int] = None # Default sheet. self._overwrite_default_sheet: bool = False self._default_sheet_id = None self._first_sheet = True # Requests batch. self._requests = [] def initialize(self, task_ctx: 'task_context.TaskContextInterface'): """ Initializes the connector and prepares the Google Sheets document. :param task_ctx: Task Context. """ self._connector = GoogleSpreadsheetConnector( task_ctx.user_credentials, task_ctx.throttling_service_container, error_reporter=task_ctx.error_reporter ) self.output_ctx.state['sheets'] = self._sheets_state # Load previous state if self.output_ctx.previous_state and 'sheets' in self.output_ctx.previous_state: self._previous_sheets: dict[str, dict[str, Any]] = self.output_ctx.previous_state['sheets'] self._unused_sheets = set(self._previous_sheets.keys()) self._spreadsheet_id = self.output_ctx.output_path # Try to load current information about the sheet. from ppc_robot_lib.output import ExcelGoogleDriveAdapter files_service = GoogleDriveConnector(task_ctx.user_credentials) self._spreadsheet_id = self._spreadsheet_id if is_not_trashed(self._spreadsheet_id, files_service) else None if self._spreadsheet_id: try: self._spreadsheet_properties = self.load_spreadsheet_properties(self._spreadsheet_id) self._retrieved_sheets = self._spreadsheet_properties.sheets except HttpError as exc: # 404 or 403 errors means that we have to create a new spreadsheet. if exc.resp and (exc.resp.status == 404 or exc.resp.status == 403): self._spreadsheet_id = None else: GoogleSpreadsheetError.raise_from_http_error(exc) # Create a new spreadsheet if necessary. if not self._spreadsheet_id: # Discard previous state, as we are creating a new spreadsheet. self._previous_sheets = {} self._unused_sheets = set() self._spreadsheet_id = self.create_new_spreadsheet(task_ctx) self.output_ctx.output_path = self._spreadsheet_id try: self._spreadsheet_properties = self.load_spreadsheet_properties(self._spreadsheet_id) self._retrieved_sheets = self._spreadsheet_properties.sheets # When a new spreadsheet is created, it contains a single empty sheet. # This sheet can be renamed and overwritten instead of creating new sheets. try: first_sheet = next(iter(self._retrieved_sheets.values())) self._default_sheet_id = first_sheet.id self._overwrite_default_sheet = True except StopIteration: # No sheets, ignore pass except HttpError as exc: GoogleSpreadsheetError.raise_from_http_error(exc) try: self._locale = babel.Locale.parse(self._spreadsheet_properties.locale_code) except (ValueError, babel.UnknownLocaleError): self._locale = None def create_new_spreadsheet(self, task_ctx: 'task_context.TaskContextInterface') -> str: """ Creates a new empty Spreadsheet on Google Drive. :param task_ctx: Task Context. :return: ID of the new Spreadsheet. """ try: folder_id = get_target_folder(task_ctx.job, task_ctx.user_credentials) drive_connector = GoogleDriveConnector( task_ctx.user_credentials, task_ctx.throttling_service_container, error_reporter=task_ctx.error_reporter ) sheet_metadata = { 'name': task_ctx.job.name, 'mimeType': 'application/vnd.google-apps.spreadsheet', 'parents': [folder_id], } new_file = drive_connector.files_create(body=sheet_metadata, fields='id') return new_file.get('id') except HttpError as exc: GoogleDriveError.raise_from_http_error(exc) def load_spreadsheet_properties(self, spreadsheet_id) -> SpreadsheetProperties | None: """ Loads the spreadsheet properties and list of all sheets present in the spreadsheet. :param spreadsheet_id: Spreadsheet ID. :return: Properties of the spreadsheet. """ properties = SpreadsheetProperties() # Load spreadsheet properties: spreadsheet = self._connector.get(spreadsheet_id) if 'locale' in spreadsheet['properties']: properties.locale_code = spreadsheet['properties']['locale'] if 'title' in spreadsheet['properties']: properties.title = spreadsheet['properties']['title'] # Create a dictionary of retrieved sheets. for sheet in spreadsheet['sheets']: properties.sheets[sheet['properties']['title']] = self.extract_sheet_properties(sheet) return properties def write_output(self, task_ctx: 'task_context.TaskContextInterface', sheets: list[SheetData]): """ Writes the given spreadsheets. :param task_ctx: Task Context. :param sheets: List of spreadsheets to write. """ prepared_sheets = self.prepare_sheets(sheets) # # Create all sheets and fill them with data. # for sheet in prepared_sheets: sheet_data = sheet.data # Get the new header. If no mapping was given, use the last header row. if sheet_data.row_columns: new_headers = [col.name for col in sheet_data.row_columns] column_resolver = ColumnIndexResolver(sheet_data.row_columns) else: new_headers = sheet_data.data[sheet_data.header_row_count - 1] column_resolver = ColumnIndexResolver([]) if sheet_data.embedded_charts: new_headers.append('__charts__') target_rows = len(sheet_data.data) # If this is the first sheet written, add rename operation if required. if self._first_sheet: self._first_sheet = False if self._spreadsheet_properties.title != task_ctx.job.name: self._enqueue_batch_update({ 'updateSpreadsheetProperties': { 'properties': { 'title': task_ctx.job.name, }, 'fields': 'title', } }) sheet_id, row_count, total_columns, used_columns, column_additions, resize_all_cols = ( self.create_or_update_sheet_properties(new_headers, sheet, target_rows) ) if sheet_data.sheet_id: self._column_resolvers[sheet_data.sheet_id] = column_resolver self._sheet_id_map[sheet_data.sheet_id] = sheet_id self._sheet_data_map[sheet_data.sheet_id] = sheet_data # # Create merge definitions for the header. # for merge_def in sheet_data.merges: self._enqueue_batch_update({ 'mergeCells': { 'mergeType': 'MERGE_ALL', 'range': { 'sheetId': sheet_id, 'startRowIndex': merge_def.start_row, 'endRowIndex': merge_def.end_row, 'startColumnIndex': merge_def.start_column, 'endColumnIndex': merge_def.end_column, }, } }) # # Create formatting to header. # if len(sheet_data.formats) > 0: row_data = [ {'values': list(map(lambda x: {'userEnteredFormat': self.create_format(x)}, row))} for row in sheet_data.formats ] self._enqueue_batch_update({ 'updateCells': { 'rows': row_data, 'fields': 'userEnteredFormat', 'start': {'sheetId': sheet_id, 'rowIndex': 0, 'columnIndex': 0}, } }) # # Create value formats and conditional rules. # cond_rule_index = 0 if sheet_data.row_columns is not None and target_rows > sheet_data.header_row_count: for index, column in enumerate(sheet_data.row_columns): # Cell value formats. number_format = None if column.cell_format: if column.cell_format.format_type is FormatType.CURRENCY and task_ctx.currency: number_format = copy(column.cell_format) number_format.pattern = OfficeCurrencyFormat.get_format_string( task_ctx.currency, self._locale ) else: number_format = column.cell_format user_format = { 'verticalAlignment': column.vertical_alignment.value.upper(), } if number_format: user_format['numberFormat'] = number_format.to_dict() if column.horizontal_alignment: user_format['horizontalAlignment'] = column.horizontal_alignment.value.upper() self._enqueue_batch_update({ 'repeatCell': { 'range': { 'sheetId': sheet_id, 'startRowIndex': sheet_data.header_row_count, 'startColumnIndex': index, 'endColumnIndex': index + 1, }, 'cell': { 'userEnteredFormat': user_format, }, 'fields': 'userEnteredFormat', } }) # Conditional rules. if column.format_rules: for format_rule in column.format_rules: rule = self.create_format_rule( format_rule, self._locale, sheet_data.row_columns, sheet_data.header_row_count ) rule['ranges'] = ( [ { 'sheetId': sheet_id, 'startRowIndex': sheet_data.header_row_count, 'startColumnIndex': index, 'endColumnIndex': index + 1, } ], ) self._enqueue_batch_update({ 'addConditionalFormatRule': { 'rule': rule, 'index': cond_rule_index, } }) cond_rule_index += 1 try: # Send the first batch or requests. self._flush_batch_updates() # Record sheet to new state - sheet has been prepared and has a proper name: self._sheets_state[sheet_data.sheet_name] = {'id': sheet_data.sheet_id, 'headers': new_headers} # # Prepare the output stream, that will analyze the data during the output encoding. # formula_separators = self.get_formula_separators(self._locale) values = RowOutputTransformation( sheet_data.data, self._locale, formula_separators, sheet_data.row_columns, sheet_data.currency_column_index, sheet_data.header_row_count, ) # # Write data. # sheet_range = ( f'{sheet_data.sheet_name}!{rowcol_to_a1(1, 1)}:{rowcol_to_a1(target_rows, len(new_headers))}' ) self._connector.values_update( self._spreadsheet_id, sheet_range, value_input_option='USER_ENTERED', body={'values': values} ) column_format_runs = values.column_format_runs column_widths = values.column_widths column_cell_formats = values.column_cell_formats column_text_widths = values.column_text_widths target_widths = [] if sheet_data.row_columns: for col_index, col in enumerate(sheet_data.row_columns): target_width = None if col.width: target_width = self.text_width_to_px(col.width) elif col.max_width and col_index in column_text_widths: target_width = self.text_width_to_px(min(col.max_width, column_text_widths[col_index])) if col_index in column_widths: if target_width: target_width = max(target_width, column_widths[col_index]) else: target_width = column_widths[col_index] target_widths.append(target_width) # # Apply in-cell text-format runs, if any. # for col_idx, format_runs in column_format_runs.items(): self._enqueue_batch_update({ 'updateCells': { 'start': { 'sheetId': sheet_id, 'rowIndex': format_runs.offset, 'columnIndex': col_idx, }, 'fields': 'textFormatRuns', 'rows': [ {'values': [{'textFormatRuns': runs} if runs is not None else {}]} for runs in format_runs.cells ], } }) # # Apply individual cell formats, if any. # for col_idx, cell_formats in column_cell_formats.items(): self._enqueue_batch_update({ 'updateCells': { 'start': { 'sheetId': sheet_id, 'rowIndex': cell_formats.offset, 'columnIndex': col_idx, }, 'fields': 'userEnteredFormat', 'rows': [ { 'values': [ { 'userEnteredFormat': { 'numberFormat': target_format.to_dict() if target_format else {}, }, } ] } for target_format in cell_formats.cells ], } }) self._flush_batch_updates() # Copy formulas if necessary. if total_columns > used_columns and target_rows > row_count: # Read user-specified columns from the last row. # If any formulas are found, they will be copied. sheet_range = ( f'{sheet_data.sheet_name}!{rowcol_to_a1(row_count, used_columns + 1)}:' f'{rowcol_to_a1(row_count, total_columns)}' ) response = self._connector.values_get( self._spreadsheet_id, sheet_range, value_render_option='FORMULA' ) if 'values' in response and len(response['values']) > 0: row = response['values'][0] for index, cell in enumerate(row): if isinstance(cell, str) and len(cell) > 0 and cell[0] == '=': self._enqueue_batch_update({ 'copyPaste': { 'source': { 'sheetId': sheet_id, 'startRowIndex': row_count - 1, 'endRowIndex': row_count, 'startColumnIndex': used_columns + index, 'endColumnIndex': used_columns + index + 1, }, 'destination': { 'sheetId': sheet_id, 'startRowIndex': row_count, 'endRowIndex': target_rows, 'startColumnIndex': used_columns + index, 'endColumnIndex': used_columns + index + 1, }, 'pasteType': 'PASTE_FORMULA', 'pasteOrientation': 'NORMAL', } }) # # Add embedded charts if necessary: # if sheet_data.embedded_charts: chart_column_index = len(new_headers) - 1 start_index = sheet_data.header_row_count chart_factory = ChartDefinitionFactory( sheet_id_map=self._sheet_id_map, sheet_data_map=self._sheet_data_map, column_resolvers=self._column_resolvers, current_sheet_id=sheet_id, current_header_rows=start_index, current_column_resolver=column_resolver, ) # Determine the maximum width of a chart max_chart_width = 0 for chart in sheet_data.embedded_charts: if chart.width > max_chart_width: max_chart_width = chart.width # Resize the column with charts: self._enqueue_batch_update({ 'updateDimensionProperties': { 'range': { 'sheetId': sheet_id, 'dimension': 'COLUMNS', 'startIndex': chart_column_index, 'endIndex': chart_column_index + 1, }, 'properties': { 'pixelSize': max_chart_width + 2 * self.CHART_X_MARGIN, }, 'fields': 'pixelSize', } }) # Add all charts y_offset = 0 for chart in sheet_data.embedded_charts: definition = chart_factory.create_embedded_chart_dict( chart, start_index, chart_column_index, y_offset, self.CHART_X_MARGIN ) self._enqueue_batch_update({ 'addChart': { 'chart': definition, } }) y_offset += chart.height + self.CHART_Y_MARGIN # # Resize columns - either all, or just the columns that were added. # if not resize_all_cols: for addition in column_additions: if addition.insert_index in column_widths: target_width = column_widths[addition.insert_index] else: target_width = None if target_width is None or target_width == 0: self._enqueue_batch_update({ 'autoResizeDimensions': { 'dimensions': { 'sheetId': sheet_id, 'dimension': 'COLUMNS', 'startIndex': addition.insert_index, 'endIndex': addition.insert_index + 1, } } }) else: self._enqueue_batch_update({ 'updateDimensionProperties': { 'range': { 'sheetId': sheet_id, 'dimension': 'COLUMNS', 'startIndex': addition.insert_index, 'endIndex': addition.insert_index + 1, }, 'properties': { 'pixelSize': target_width, }, 'fields': 'pixelSize', } }) else: self._enqueue_batch_update({ 'autoResizeDimensions': { 'dimensions': { 'sheetId': sheet_id, 'dimension': 'COLUMNS', } } }) for col_index, width in enumerate(target_widths): if width is not None: self._enqueue_batch_update({ 'updateDimensionProperties': { 'range': { 'sheetId': sheet_id, 'dimension': 'COLUMNS', 'startIndex': col_index, 'endIndex': col_index + 1, }, 'properties': { 'pixelSize': width, }, 'fields': 'pixelSize', } }) # # Resize rows - will be always performed. # if sheet_data.row_height and target_rows > sheet_data.header_row_count: self._enqueue_batch_update({ 'updateDimensionProperties': { 'range': { 'sheetId': sheet_id, 'dimension': 'ROWS', 'startIndex': sheet_data.header_row_count, }, 'properties': { 'pixelSize': sheet_data.row_height, }, 'fields': 'pixelSize', } }) self._flush_batch_updates() except HttpError as exc: GoogleSpreadsheetError.raise_from_http_error(exc) def create_or_update_sheet_properties(self, new_headers: list, sheet: SheetConfiguration, target_rows: int): """ Update an already existing sheet (potentially overwriting the default one), or create a new sheet if this is a new sheet in the report. :param new_headers: Table headers. :param sheet: Sheet configuration. :param target_rows: Total number of rows in the table. :return: Properties of the new sheet. """ sheet_data = sheet.data column_additions: list[ItemAddition] = [] should_set_filter = sheet_data.create_filter if not sheet.use_default_sheet: # # Use and update existing sheet if present. # if sheet.previous_name in self._retrieved_sheets: retrieved_sheet = self._retrieved_sheets[sheet.previous_name] sheet_id = retrieved_sheet.id row_count = retrieved_sheet.row_count used_columns = len(new_headers) # # Delete all conditional rules. # for _ in range(0, retrieved_sheet.cond_rule_count): self._enqueue_batch_update({ 'deleteConditionalFormatRule': { 'sheetId': sheet_id, 'index': 0, } }) # # Delete all charts # for chart_id in retrieved_sheet.chart_ids: self._enqueue_batch_update({ 'deleteEmbeddedObject': { 'objectId': chart_id, } }) # # Unmerge all cells. # self._enqueue_batch_update({ 'unmergeCells': { 'range': { 'sheetId': sheet_id, } } }) update_props = {} # # Rename the sheet if necessary. # if sheet.previous_name != sheet_data.sheet_name: update_props['title'] = sheet_data.sheet_name # # Reset the frozen columns/rows if the number of dims is less than the current number of frozen dims. # reset_all_frozen = False if (retrieved_sheet.frozen_rows is not None and retrieved_sheet.frozen_rows >= target_rows) or ( retrieved_sheet.frozen_columns is not None and retrieved_sheet.frozen_columns >= used_columns ): update_props['gridProperties'] = { 'frozenRowCount': 0, 'frozenColumnCount': 0, } reset_all_frozen = True if update_props: self._enqueue_batch_update({ 'updateSheetProperties': { 'properties': {'sheetId': sheet_id, **update_props}, 'fields': ','.join(get_dict_value_paths(update_props)), } }) # # Compute column deletions and additions. # resize_all_cols = False if sheet.previous_state and 'headers' in sheet.previous_state: column_deletions, column_additions = compute_list_changes( new_headers, sheet.previous_state['headers'] ) total_columns = retrieved_sheet.col_count - len(column_deletions) + len(column_additions) else: column_deletions: list[ItemDeletion] = [] column_additions: list[ItemAddition] = [] if retrieved_sheet.col_count < len(new_headers): for idx in range(retrieved_sheet.col_count, len(new_headers)): column_additions.append(ItemAddition(idx, idx)) total_columns = used_columns # Remove all frozen columns before deleting any columns to prevent # the "Sorry, it is not possible to delete all non-frozen columns" error should_reset_frozen_cols = ( column_deletions and not reset_all_frozen and retrieved_sheet.frozen_columns is not None and retrieved_sheet.frozen_columns > 0 ) if should_reset_frozen_cols: self._enqueue_batch_update({ 'updateSheetProperties': { 'properties': { 'sheetId': sheet_id, 'gridProperties': { 'frozenColumnCount': 0, }, }, 'fields': 'gridProperties.frozenColumnCount', } }) # # Delete columns. # for deletion in column_deletions: self._enqueue_batch_update({ 'deleteDimension': { 'range': { 'sheetId': sheet_id, 'dimension': 'COLUMNS', 'startIndex': deletion.deletion_index, 'endIndex': deletion.deletion_index + 1, } } }) # # Add columns. # for addition in column_additions: self._enqueue_batch_update({ 'insertDimension': { 'range': { 'sheetId': sheet_id, 'dimension': 'COLUMNS', 'startIndex': addition.insert_index, 'endIndex': addition.insert_index + 1, }, 'inheritFromBefore': (addition.insert_index > 0), } }) # Restore previously un-frozen columns. if should_reset_frozen_cols: self._enqueue_batch_update({ 'updateSheetProperties': { 'properties': { 'sheetId': sheet_id, 'gridProperties': { 'frozenColumnCount': retrieved_sheet.frozen_columns, }, }, 'fields': 'gridProperties.frozenColumnCount', } }) # # Add or remove rows to match target number of rows. # if row_count < target_rows: self._enqueue_batch_update({ 'insertDimension': { 'range': { 'sheetId': sheet_id, 'dimension': 'ROWS', 'startIndex': row_count, 'endIndex': target_rows, }, 'inheritFromBefore': (row_count > 0), } }) elif row_count > target_rows: # Keep at least 1 blank row: fixes the error with deleting all but frozen rows. if target_rows <= sheet_data.header_row_count: start_index = sheet_data.header_row_count + 1 else: start_index = target_rows # Delete rows, but only if necessary, possibly keeping 1 empty row. if row_count > start_index: self._enqueue_batch_update({ 'deleteDimension': { 'range': { 'sheetId': sheet_id, 'dimension': 'ROWS', 'startIndex': start_index, 'endIndex': row_count, } } }) # Clear all blank rows, if any, but try to keep the formulas. if start_index > target_rows: self._enqueue_batch_update({ 'repeatCell': { 'range': { 'sheetId': sheet_id, 'startRowIndex': target_rows, 'endRowIndex': start_index, 'startColumnIndex': 0, 'endColumnIndex': used_columns, }, 'cell': { 'userEnteredValue': {'stringValue': ''}, }, 'fields': 'userEnteredValue', } }) # Update filter views if sheet_data.create_filter and retrieved_sheet.basic_filter: filter_view = retrieved_sheet.basic_filter should_set_filter = ( (filter_view.start_row_index != (sheet_data.header_row_count - 1)) or (filter_view.end_row_index is not None and filter_view.end_row_index < target_rows) or (filter_view.start_column_index is not None and filter_view.start_column_index != 0) or (filter_view.end_column_index is not None and filter_view.end_column_index < used_columns) ) else: # # Create a new blank sheet. # sheet_id = self.allocate_sheet_id() grid_props = self.create_new_grid_properties(sheet_data, new_headers) self._enqueue_batch_update({ 'addSheet': { 'properties': { 'sheetId': sheet_id, 'title': sheet_data.sheet_name, 'gridProperties': grid_props, } } }) resize_all_cols = True total_columns = used_columns = grid_props['columnCount'] row_count = target_rows else: # # Overwrite default sheet. # sheet_id = self._default_sheet_id grid_props = self.create_new_grid_properties(sheet_data, new_headers) self._enqueue_batch_update({ 'updateSheetProperties': { 'properties': { 'sheetId': sheet_id, 'title': sheet_data.sheet_name, 'gridProperties': grid_props, }, 'fields': 'title, gridProperties', } }) resize_all_cols = True total_columns = used_columns = grid_props['columnCount'] row_count = target_rows if should_set_filter: self._enqueue_batch_update({ 'setBasicFilter': { 'filter': { 'range': { 'sheetId': sheet_id, 'startRowIndex': sheet_data.header_row_count - 1, 'startColumnIndex': 0, 'endColumnIndex': used_columns, }, } } }) return sheet_id, row_count, total_columns, used_columns, column_additions, resize_all_cols def prepare_sheets(self, sheets: list[SheetData]) -> list[SheetConfiguration]: """ Prepare sheets before the write process: * Load previous state and name for each sheet. * Determine whether the sheet should overwrite the default sheet. :param sheets: Input sheets. :return: Sheet configuration. """ prepared_sheets: list[SheetConfiguration] = [] # # Find and load previous sheet state for each sheet. for sheet_data in sheets: previous_name = sheet_data.sheet_name prev_sheet_state = None if sheet_data.sheet_id: for key, sheet_state in self._previous_sheets.items(): if 'id' in sheet_state and sheet_state['id'] == sheet_data.sheet_id: previous_name = key prev_sheet_state = sheet_state if prev_sheet_state is None: if sheet_data.sheet_name in self._previous_sheets: prev_sheet_state = self._previous_sheets[sheet_data.sheet_name] if not self._overwrite_default_sheet: use_default_sheet = False # Mark sheet as used: if previous_name in self._retrieved_sheets: self._unused_sheets.discard(previous_name) else: self._unused_sheets.discard(sheet_data.sheet_name) else: use_default_sheet = True self._overwrite_default_sheet = False self._unused_sheets.discard(sheet_data.sheet_name) prepared_sheets.append(SheetConfiguration(sheet_data, previous_name, prev_sheet_state, use_default_sheet)) return prepared_sheets def finalize(self, task_ctx: 'task_context.TaskContextInterface', exc: BaseException = None): """ Deletes all sheets that were not used during the output. Only sheets that were previously created by the same report will be deleted. :param task_ctx: Task context. :param exc: Exception. """ if exc is None: delete_requests = [] for sheet_name in self._unused_sheets: if sheet_name in self._retrieved_sheets: delete_requests.append({ 'deleteSheet': { 'sheetId': self._retrieved_sheets[sheet_name].id, } }) try: self._execute_batch_update(delete_requests) except HttpError: # Ignore any HTTP errors, as this is a non-critical operation. pass @staticmethod def get_formula_separators(sheet_locale: babel.Locale = None) -> FormulaSeparators: if sheet_locale: decimal_symbol = sheet_locale.number_symbols.get('latn', {}).get('decimal', '.') if decimal_symbol in LOCALIZED_FORMULA_SEPARATORS: return LOCALIZED_FORMULA_SEPARATORS[decimal_symbol] return LOCALIZED_FORMULA_SEPARATORS['.'] @staticmethod def extract_sheet_properties(sheet: dict[str, Any]) -> RetrievedSheet: if 'conditionalFormats' in sheet: cond_formats = len(sheet['conditionalFormats']) else: cond_formats = 0 if 'gridProperties' in sheet['properties']: column_count = sheet['properties']['gridProperties']['columnCount'] row_count = sheet['properties']['gridProperties']['rowCount'] frozen_rows = sheet['properties']['gridProperties'].get('frozenRowCount', None) frozen_columns = sheet['properties']['gridProperties'].get('frozenColumnCount', None) else: column_count = 0 row_count = 0 frozen_rows = None frozen_columns = None props = RetrievedSheet( sheet['properties']['sheetId'], cond_formats, column_count, row_count, frozen_rows=frozen_rows, frozen_columns=frozen_columns, ) if 'basicFilter' in sheet: props.basic_filter = FilterView( id=None, start_row_index=sheet['basicFilter']['range'].get('startRowIndex', None), end_row_index=sheet['basicFilter']['range'].get('endRowIndex', None), start_column_index=sheet['basicFilter']['range'].get('startColumnIndex', None), end_column_index=sheet['basicFilter']['range'].get('endColumnIndex', None), ) if 'charts' in sheet: for chart in sheet['charts']: props.chart_ids.append(chart['chartId']) return props @staticmethod def create_new_grid_properties(sheet_data: SheetData, new_headers: list = None): if new_headers: column_count = len(new_headers) else: column_count = len(sheet_data.row_columns) if sheet_data.row_columns else len(sheet_data.data[0]) row_count = max(len(sheet_data.data), sheet_data.header_row_count + 1) props = {'rowCount': row_count, 'columnCount': max(column_count, 1), 'hideGridlines': False} if sheet_data.freeze_header and sheet_data.header_row_count < len(sheet_data.data): props['frozenRowCount'] = sheet_data.header_row_count if sheet_data.freeze_columns: props['frozenColumnCount'] = min(sheet_data.freeze_columns, column_count - 1) return props @staticmethod def create_format(style: CellStyle): if style is not None and not style.empty: return style.to_dict() else: return {} @staticmethod def format_value(value, locale: babel.Locale = None, row_columns: list[Column] = None, header_size: int = None): value = str(value) # Convert column references in column. if len(value) > 0 and value[0] == '=': return expand_column_references(value, row_columns, header_size + 1) # Convert decimal separator in numbers. if locale is not None: symbol = locale.number_symbols.get('latn', {}).get('decimal', '.') if symbol != '.': value = value.replace('.', symbol) return value @classmethod def create_format_rule( cls, format_rule, sheet_locale: babel.Locale = None, row_columns: list[Column] = None, header_size: int = None ): if isinstance(format_rule, BoolConditionRule): if isinstance(format_rule.value, list): values = [ {'userEnteredValue': cls.format_value(value, sheet_locale, row_columns, header_size)} for value in format_rule.value ] elif format_rule.value is not None: values = [ {'userEnteredValue': cls.format_value(format_rule.value, sheet_locale, row_columns, header_size)} ] else: values = [] return { 'booleanRule': { 'condition': { 'type': cls.CONDITION_TYPES.get(format_rule.op), 'values': values, }, 'format': format_rule.style.to_dict(), } } elif isinstance(format_rule, GradientRule): def create_gradient_point(point: GradientPoint): return { 'color': point.color.to_normalized_dict(), 'type': point.value_type.name, 'value': cls.format_value(point.value, sheet_locale) if point.value is not None else '', } return { 'gradientRule': { 'minpoint': create_gradient_point(format_rule.point_min), 'midpoint': create_gradient_point(format_rule.point_mid), 'maxpoint': create_gradient_point(format_rule.point_max), } } else: raise ValueError(f'Unsupported rule type: {type(format_rule)}.') @staticmethod def create_text_format_runs(text: FormattedText): offset = 0 runs = [] default_fmt = {} for node in text: if isinstance(node, FormattedTextNode): fmt = node.text_format.to_dict() else: fmt = default_fmt runs.append({ 'startIndex': offset, 'format': fmt, }) offset += len(node) return runs @staticmethod def format_str(value: str): escaped = value.replace('"', '" & CHAR(34) & "') return f'"{escaped}"' @classmethod def formula_value(cls, value, sheet_locale: babel.Locale): if isinstance(value, str): return cls.format_str(value) else: return cls.format_value(value, sheet_locale) @classmethod def create_sparkline_formula(cls, sparkline: Sparkline, locale: babel.Locale, separators: FormulaSeparators): if len(sparkline.values) < 2: return '', 20 values = separators.rows.join(map(cls.format_value, sparkline.values, itertools.repeat(locale))) values = f'{{{values}}}' opt_dict = cls.create_sparkline_options(sparkline.options, sparkline.color_override) options = separators.rows.join( f'{cls.format_str(k)}{separators.cells}{cls.formula_value(v, locale)}' for k, v in opt_dict.items() ) options = f'{{{options}}}' if sparkline.options.type == SparklineType.LINE: width = 10 * (len(sparkline.values) - 1) elif sparkline.options.type == SparklineType.COLUMN: width = 8 * len(sparkline.values) else: width = 60 return f'=SPARKLINE({values}{separators.parameters}{options})', width @classmethod def create_sparkline_options(cls, options: SparklineOptions, color_override: Color = None): opt_dict = {'charttype': cls.SPARKLINE_TYPES[options.type]} if options.y_min is not None: opt_dict['ymin'] = options.y_min if options.y_max is not None: opt_dict['ymax'] = options.y_max if options.line_width is not None: opt_dict['linewidth'] = round(options.line_width) if color_override is not None: opt_dict['color'] = color_override.to_hex_str() elif options.color is not None: opt_dict['color'] = options.color.to_hex_str() if options.negative_color is not None: opt_dict['negcolor'] = options.negative_color.to_hex_str() if options.low_color is not None: opt_dict['lowcolor'] = options.low_color.to_hex_str() if options.high_color is not None: opt_dict['highcolor'] = options.high_color.to_hex_str() return opt_dict def text_width_to_px(self, text_width): return round(text_width * self.AVG_CHAR_WIDTH) def allocate_sheet_id(self) -> int: """ Gets a random sheet ID that is not already in use. :return: Unique sheet ID. """ if self._sheet_id_set is None: self._sheet_id_set = set(sheet for sheet in self._retrieved_sheets) sheet_id = random.randint(0, self.MAX_SHEET_ID) while sheet_id in self._sheet_id_set: sheet_id = random.randint(0, self.MAX_SHEET_ID) self._sheet_id_set.add(sheet_id) return sheet_id def _enqueue_batch_update(self, request: dict[str, Any]): """ Adds the given batch update request to the queue. All queued requests are executed by calling :py:meth:`flush_batch_updates`. :param request: Request dictionary to add. See https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/request """ self._requests.append(request) def _flush_batch_updates(self): """ Executes all queued batch updates and clears the current queue. """ self._execute_batch_update(self._requests) self._requests.clear() def _execute_batch_update(self, requests: list[dict[str, Any]]): """ Excutes the given batchUpdate requests. :param requests: List of requess. :return: Response from the batchUpdate call. """ if requests: return self._connector.batch_update( self._spreadsheet_id, body={ 'requests': requests, }, ) else: return None