import itertools
import random
from collections import defaultdict, namedtuple
from copy import copy
from typing import Any
import babel
from dataclasses import dataclass, field
from googleapiclient.errors import HttpError
from ppc_robot_lib.google_drive import GoogleDriveConnector
from ppc_robot_lib.google_drive.connector import is_not_trashed
from ppc_robot_lib.google_drive.exceptions import GoogleDriveError
from ppc_robot_lib.google_drive.folder import get_target_folder
from ppc_robot_lib.google_spreadsheet import GoogleSpreadsheetConnector
from ppc_robot_lib.google_spreadsheet.exceptions import GoogleSpreadsheetError
from ppc_robot_lib.output.abstract_adapter import AbstractOutputAdapter
from ppc_robot_lib.output.currency import OfficeCurrencyFormat
from ppc_robot_lib.output.google_sheet.charts import ChartDefinitionFactory
from ppc_robot_lib.output.iter import SheetDataAdapter
from ppc_robot_lib.output.resolver import ColumnIndexResolver
from ppc_robot_lib.output.types import OutputContext, SheetData
from ppc_robot_lib.output.utils import expand_column_references, get_text_width
from ppc_robot_lib.tasks import task_context
from ppc_robot_lib.utils.changes import ItemAddition, ItemDeletion, compute_list_changes
from ppc_robot_lib.utils.dictionary import get_dict_value_paths
from ppc_robot_lib.utils.types import (
BoolConditionOperator,
BoolConditionRule,
CellStyle,
Color,
Column,
FormatType,
FormattedText,
FormattedTextNode,
GradientPoint,
GradientRule,
Image,
Sparkline,
SparklineOptions,
SparklineType,
Format,
)
FormulaSeparators = namedtuple('FormulaSeparators', ['parameters', 'cells', 'rows'])
LETTER_OFFSET = ord('@')
# Mapping of separators by decimal point symbol.
LOCALIZED_FORMULA_SEPARATORS: dict[str, FormulaSeparators] = {
'.': FormulaSeparators(parameters=',', cells=',', rows=';'),
',': FormulaSeparators(parameters=';', cells='\\', rows=';'),
}
def rowcol_to_a1(row, col):
"""Translates a row and column cell address to A1 notation.
:param row: The row of the cell to be converted.
Rows start at index 1.
:param col: The column of the cell to be converted.
Columns start at index 1.
:returns: a string containing the cell's coordinates in A1 notation.
Example:
>>> rowcol_to_a1(1, 1)
A1
"""
col = int(col)
div = col
column_label = ''
while div:
(div, mod) = divmod(div, 26)
if mod == 0:
mod = 26
div -= 1
column_label = chr(mod + LETTER_OFFSET) + column_label
label = f'{column_label}{int(row)}'
return label
@dataclass()
class FilterView:
id: int | None
start_row_index: int | None
end_row_index: int | None
start_column_index: int | None
end_column_index: int | None
@dataclass()
class RetrievedSheet:
id: int
cond_rule_count: int
col_count: int
row_count: int
basic_filter: FilterView = None
chart_ids: list[int] = field(default_factory=list)
frozen_rows: int | None = None
frozen_columns: int | None = None
@dataclass()
class SpreadsheetProperties:
title: str = None
locale_code: str = None
sheets: dict[str, RetrievedSheet] = field(default_factory=dict)
@dataclass()
class SheetConfiguration:
data: SheetData
previous_name: str
previous_state: dict[str, Any]
use_default_sheet: bool
class ColumnFormatRun:
def __init__(self, offset):
self.offset = offset
self.cells = []
def add_format_run(self, index, format_run):
idx = index - self.offset
if idx > len(self.cells):
self.cells.extend(itertools.repeat(None, idx - len(self.cells)))
self.cells.append(format_run)
class CellFormats:
def __init__(self, offset):
self.offset = offset
self.cells: list[Format | None] = []
def add_format(self, index, fmt: Format):
idx = index - self.offset
if idx > len(self.cells):
self.cells.extend(itertools.repeat(None, idx - len(self.cells)))
self.cells.append(fmt)
class RowOutputTransformation(list):
# noinspection PyMissingConstructor
def __init__(
self,
sheet_data: SheetDataAdapter,
locale,
formula_separators,
row_columns: list[Column] = None,
currency_column_index: int = None,
header_rows: int = 0,
):
self._data = sheet_data
self._locale = locale
self._formula_separators = formula_separators
self._row_columns = row_columns if row_columns is not None else []
self._currency_column_index = currency_column_index
self._header_rows = header_rows
self._column_widths: dict[int, int] = defaultdict(int)
self._column_format_runs: dict[int, ColumnFormatRun] = {}
self._cell_formats: dict[int, CellFormats] = {}
self._text_widths: dict[int, int] = defaultdict(int)
self._currency_columns = self.get_currency_columms()
self._max_width_columns = self.get_columns_with_max_width()
def get_currency_columms(self):
currency_columns = {}
for index, column in enumerate(self._row_columns):
if column.cell_format and column.cell_format.format_type == FormatType.CURRENCY:
currency_columns[index] = column.cell_format
return currency_columns
def get_columns_with_max_width(self):
columns_with_max_width = set()
for index, column in enumerate(self._row_columns):
if column.max_width:
columns_with_max_width.add(index)
return columns_with_max_width
@property
def column_widths(self) -> dict[int, int]:
return self._column_widths
@property
def column_cell_formats(self) -> dict[int, CellFormats]:
return self._cell_formats
@property
def column_format_runs(self) -> dict[int, ColumnFormatRun]:
return self._column_format_runs
@property
def column_text_widths(self) -> dict[int, int]:
return self._text_widths
def __len__(self):
return len(self._data)
def __iter__(self):
return self._iter_generator()
def _iter_generator(self):
_locale = self._locale
_formula_separators = self._formula_separators
for row_idx, row in enumerate(self._data):
if self._currency_column_index:
row_currency = row[self._currency_column_index]
else:
row_currency = None
for col_idx, value in enumerate(row):
has_object = False
if isinstance(value, FormattedText):
row[col_idx] = str(value)
if col_idx not in self._column_format_runs:
self._column_format_runs[col_idx] = ColumnFormatRun(row_idx)
self._column_format_runs[col_idx].add_format_run(
row_idx, GoogleSpreadsheetAdapter.create_text_format_runs(value)
)
has_object = True
elif isinstance(value, Image):
quoted_url = value.url.replace('"', '" & CHAR(34) & "')
row[col_idx] = f'=HYPERLINK("{quoted_url}"; IMAGE("{quoted_url}"))'
has_object = True
elif isinstance(value, Sparkline):
formula, width = GoogleSpreadsheetAdapter.create_sparkline_formula(
value, _locale, _formula_separators
)
row[col_idx] = formula
if width > self._column_widths[col_idx]:
self._column_widths[col_idx] = width
has_object = True
elif isinstance(value, str):
row[col_idx] = f"'{value}"
elif row_currency is not None and col_idx in self._currency_columns:
if col_idx not in self._cell_formats:
self._cell_formats[col_idx] = CellFormats(row_idx)
cell_format = copy(self._currency_columns[col_idx])
cell_format.pattern = OfficeCurrencyFormat.get_format_string(row_currency, self._locale)
self._cell_formats[col_idx].add_format(row_idx, cell_format)
if col_idx in self._max_width_columns and not has_object and row_idx >= self._header_rows:
text_width = get_text_width(value, self._row_columns[col_idx].cell_format)
if text_width > self._text_widths[col_idx]:
self._text_widths[col_idx] = text_width
yield row
[docs]
class GoogleSpreadsheetAdapter(AbstractOutputAdapter):
"""
Writes data to Google Sheets using Google Sheets API:
https://developers.google.com/sheets/
"""
CONDITION_TYPES = {
BoolConditionOperator.EQ: 'NUMBER_EQ',
BoolConditionOperator.NEQ: 'NUMBER_NOT_EQ',
BoolConditionOperator.LT: 'NUMBER_LESS',
BoolConditionOperator.LTE: 'NUMBER_LESS_THAN_EQ',
BoolConditionOperator.GT: 'NUMBER_GREATER',
BoolConditionOperator.GTE: 'NUMBER_GREATER_THAN_EQ',
BoolConditionOperator.BETWEEN: 'NUMBER_BETWEEN',
BoolConditionOperator.NOT_BETWEEN: 'NUMBER_NOT_BETWEEN',
BoolConditionOperator.CUSTOM_FORMULA: 'CUSTOM_FORMULA',
}
SPARKLINE_TYPES = {
SparklineType.LINE: 'line',
SparklineType.COLUMN: 'column',
}
MAX_SHEET_ID = 2**31
CHART_X_MARGIN = 5
CHART_Y_MARGIN = 10
AVG_CHAR_WIDTH = 7.6
"""Average character width in pixels."""
def __init__(self, output_ctx: OutputContext):
super().__init__(output_ctx)
self._connector: GoogleSpreadsheetConnector = None
# Current and previous state.
self._sheets_state: dict[str, dict[str, Any]] = {}
self._previous_sheets: dict[str, dict[str, Any]] = {}
self._unused_sheets = set()
# Resolvers
self._column_resolvers: dict[int, ColumnIndexResolver] = {}
self._sheet_id_map: dict[int, int] = {} # Map of internal sheet IDs to IDs in Google Sheets
self._sheet_data_map: dict[int, SheetData] = {}
# Current spreadsheet.
self._spreadsheet_id: str = None
self._spreadsheet_properties: SpreadsheetProperties = None
self._locale = None
self._retrieved_sheets: dict[str, RetrievedSheet] = {}
self._sheet_id_set: set[int] = None
# Default sheet.
self._overwrite_default_sheet: bool = False
self._default_sheet_id = None
self._first_sheet = True
# Requests batch.
self._requests = []
def initialize(self, task_ctx: 'task_context.TaskContextInterface'):
"""
Initializes the connector and prepares the Google Sheets document.
:param task_ctx: Task Context.
"""
self._connector = GoogleSpreadsheetConnector(
task_ctx.user_credentials, task_ctx.throttling_service_container, error_reporter=task_ctx.error_reporter
)
self.output_ctx.state['sheets'] = self._sheets_state
# Load previous state
if self.output_ctx.previous_state and 'sheets' in self.output_ctx.previous_state:
self._previous_sheets: dict[str, dict[str, Any]] = self.output_ctx.previous_state['sheets']
self._unused_sheets = set(self._previous_sheets.keys())
self._spreadsheet_id = self.output_ctx.output_path
# Try to load current information about the sheet.
from ppc_robot_lib.output import ExcelGoogleDriveAdapter
files_service = GoogleDriveConnector(task_ctx.user_credentials)
self._spreadsheet_id = self._spreadsheet_id if is_not_trashed(self._spreadsheet_id, files_service) else None
if self._spreadsheet_id:
try:
self._spreadsheet_properties = self.load_spreadsheet_properties(self._spreadsheet_id)
self._retrieved_sheets = self._spreadsheet_properties.sheets
except HttpError as exc:
# 404 or 403 errors means that we have to create a new spreadsheet.
if exc.resp and (exc.resp.status == 404 or exc.resp.status == 403):
self._spreadsheet_id = None
else:
GoogleSpreadsheetError.raise_from_http_error(exc)
# Create a new spreadsheet if necessary.
if not self._spreadsheet_id:
# Discard previous state, as we are creating a new spreadsheet.
self._previous_sheets = {}
self._unused_sheets = set()
self._spreadsheet_id = self.create_new_spreadsheet(task_ctx)
self.output_ctx.output_path = self._spreadsheet_id
try:
self._spreadsheet_properties = self.load_spreadsheet_properties(self._spreadsheet_id)
self._retrieved_sheets = self._spreadsheet_properties.sheets
# When a new spreadsheet is created, it contains a single empty sheet.
# This sheet can be renamed and overwritten instead of creating new sheets.
try:
first_sheet = next(iter(self._retrieved_sheets.values()))
self._default_sheet_id = first_sheet.id
self._overwrite_default_sheet = True
except StopIteration:
# No sheets, ignore
pass
except HttpError as exc:
GoogleSpreadsheetError.raise_from_http_error(exc)
try:
self._locale = babel.Locale.parse(self._spreadsheet_properties.locale_code)
except (ValueError, babel.UnknownLocaleError):
self._locale = None
def create_new_spreadsheet(self, task_ctx: 'task_context.TaskContextInterface') -> str:
"""
Creates a new empty Spreadsheet on Google Drive.
:param task_ctx: Task Context.
:return: ID of the new Spreadsheet.
"""
try:
folder_id = get_target_folder(task_ctx.job, task_ctx.user_credentials)
drive_connector = GoogleDriveConnector(
task_ctx.user_credentials, task_ctx.throttling_service_container, error_reporter=task_ctx.error_reporter
)
sheet_metadata = {
'name': task_ctx.job.name,
'mimeType': 'application/vnd.google-apps.spreadsheet',
'parents': [folder_id],
}
new_file = drive_connector.files_create(body=sheet_metadata, fields='id')
return new_file.get('id')
except HttpError as exc:
GoogleDriveError.raise_from_http_error(exc)
def load_spreadsheet_properties(self, spreadsheet_id) -> SpreadsheetProperties | None:
"""
Loads the spreadsheet properties and list of all sheets present in the spreadsheet.
:param spreadsheet_id: Spreadsheet ID.
:return: Properties of the spreadsheet.
"""
properties = SpreadsheetProperties()
# Load spreadsheet properties:
spreadsheet = self._connector.get(spreadsheet_id)
if 'locale' in spreadsheet['properties']:
properties.locale_code = spreadsheet['properties']['locale']
if 'title' in spreadsheet['properties']:
properties.title = spreadsheet['properties']['title']
# Create a dictionary of retrieved sheets.
for sheet in spreadsheet['sheets']:
properties.sheets[sheet['properties']['title']] = self.extract_sheet_properties(sheet)
return properties
def write_output(self, task_ctx: 'task_context.TaskContextInterface', sheets: list[SheetData]):
"""
Writes the given spreadsheets.
:param task_ctx: Task Context.
:param sheets: List of spreadsheets to write.
"""
prepared_sheets = self.prepare_sheets(sheets)
#
# Create all sheets and fill them with data.
#
for sheet in prepared_sheets:
sheet_data = sheet.data
# Get the new header. If no mapping was given, use the last header row.
if sheet_data.row_columns:
new_headers = [col.name for col in sheet_data.row_columns]
column_resolver = ColumnIndexResolver(sheet_data.row_columns)
else:
new_headers = sheet_data.data[sheet_data.header_row_count - 1]
column_resolver = ColumnIndexResolver([])
if sheet_data.embedded_charts:
new_headers.append('__charts__')
target_rows = len(sheet_data.data)
# If this is the first sheet written, add rename operation if required.
if self._first_sheet:
self._first_sheet = False
if self._spreadsheet_properties.title != task_ctx.job.name:
self._enqueue_batch_update({
'updateSpreadsheetProperties': {
'properties': {
'title': task_ctx.job.name,
},
'fields': 'title',
}
})
sheet_id, row_count, total_columns, used_columns, column_additions, resize_all_cols = (
self.create_or_update_sheet_properties(new_headers, sheet, target_rows)
)
if sheet_data.sheet_id:
self._column_resolvers[sheet_data.sheet_id] = column_resolver
self._sheet_id_map[sheet_data.sheet_id] = sheet_id
self._sheet_data_map[sheet_data.sheet_id] = sheet_data
#
# Create merge definitions for the header.
#
for merge_def in sheet_data.merges:
self._enqueue_batch_update({
'mergeCells': {
'mergeType': 'MERGE_ALL',
'range': {
'sheetId': sheet_id,
'startRowIndex': merge_def.start_row,
'endRowIndex': merge_def.end_row,
'startColumnIndex': merge_def.start_column,
'endColumnIndex': merge_def.end_column,
},
}
})
#
# Create formatting to header.
#
if len(sheet_data.formats) > 0:
row_data = [
{'values': list(map(lambda x: {'userEnteredFormat': self.create_format(x)}, row))}
for row in sheet_data.formats
]
self._enqueue_batch_update({
'updateCells': {
'rows': row_data,
'fields': 'userEnteredFormat',
'start': {'sheetId': sheet_id, 'rowIndex': 0, 'columnIndex': 0},
}
})
#
# Create value formats and conditional rules.
#
cond_rule_index = 0
if sheet_data.row_columns is not None and target_rows > sheet_data.header_row_count:
for index, column in enumerate(sheet_data.row_columns):
# Cell value formats.
number_format = None
if column.cell_format:
if column.cell_format.format_type is FormatType.CURRENCY and task_ctx.currency:
number_format = copy(column.cell_format)
number_format.pattern = OfficeCurrencyFormat.get_format_string(
task_ctx.currency, self._locale
)
else:
number_format = column.cell_format
user_format = {
'verticalAlignment': column.vertical_alignment.value.upper(),
}
if number_format:
user_format['numberFormat'] = number_format.to_dict()
if column.horizontal_alignment:
user_format['horizontalAlignment'] = column.horizontal_alignment.value.upper()
self._enqueue_batch_update({
'repeatCell': {
'range': {
'sheetId': sheet_id,
'startRowIndex': sheet_data.header_row_count,
'startColumnIndex': index,
'endColumnIndex': index + 1,
},
'cell': {
'userEnteredFormat': user_format,
},
'fields': 'userEnteredFormat',
}
})
# Conditional rules.
if column.format_rules:
for format_rule in column.format_rules:
rule = self.create_format_rule(
format_rule, self._locale, sheet_data.row_columns, sheet_data.header_row_count
)
rule['ranges'] = (
[
{
'sheetId': sheet_id,
'startRowIndex': sheet_data.header_row_count,
'startColumnIndex': index,
'endColumnIndex': index + 1,
}
],
)
self._enqueue_batch_update({
'addConditionalFormatRule': {
'rule': rule,
'index': cond_rule_index,
}
})
cond_rule_index += 1
try:
# Send the first batch or requests.
self._flush_batch_updates()
# Record sheet to new state - sheet has been prepared and has a proper name:
self._sheets_state[sheet_data.sheet_name] = {'id': sheet_data.sheet_id, 'headers': new_headers}
#
# Prepare the output stream, that will analyze the data during the output encoding.
#
formula_separators = self.get_formula_separators(self._locale)
values = RowOutputTransformation(
sheet_data.data,
self._locale,
formula_separators,
sheet_data.row_columns,
sheet_data.currency_column_index,
sheet_data.header_row_count,
)
#
# Write data.
#
sheet_range = (
f'{sheet_data.sheet_name}!{rowcol_to_a1(1, 1)}:{rowcol_to_a1(target_rows, len(new_headers))}'
)
self._connector.values_update(
self._spreadsheet_id, sheet_range, value_input_option='USER_ENTERED', body={'values': values}
)
column_format_runs = values.column_format_runs
column_widths = values.column_widths
column_cell_formats = values.column_cell_formats
column_text_widths = values.column_text_widths
target_widths = []
if sheet_data.row_columns:
for col_index, col in enumerate(sheet_data.row_columns):
target_width = None
if col.width:
target_width = self.text_width_to_px(col.width)
elif col.max_width and col_index in column_text_widths:
target_width = self.text_width_to_px(min(col.max_width, column_text_widths[col_index]))
if col_index in column_widths:
if target_width:
target_width = max(target_width, column_widths[col_index])
else:
target_width = column_widths[col_index]
target_widths.append(target_width)
#
# Apply in-cell text-format runs, if any.
#
for col_idx, format_runs in column_format_runs.items():
self._enqueue_batch_update({
'updateCells': {
'start': {
'sheetId': sheet_id,
'rowIndex': format_runs.offset,
'columnIndex': col_idx,
},
'fields': 'textFormatRuns',
'rows': [
{'values': [{'textFormatRuns': runs} if runs is not None else {}]}
for runs in format_runs.cells
],
}
})
#
# Apply individual cell formats, if any.
#
for col_idx, cell_formats in column_cell_formats.items():
self._enqueue_batch_update({
'updateCells': {
'start': {
'sheetId': sheet_id,
'rowIndex': cell_formats.offset,
'columnIndex': col_idx,
},
'fields': 'userEnteredFormat',
'rows': [
{
'values': [
{
'userEnteredFormat': {
'numberFormat': target_format.to_dict() if target_format else {},
},
}
]
}
for target_format in cell_formats.cells
],
}
})
self._flush_batch_updates()
# Copy formulas if necessary.
if total_columns > used_columns and target_rows > row_count:
# Read user-specified columns from the last row.
# If any formulas are found, they will be copied.
sheet_range = (
f'{sheet_data.sheet_name}!{rowcol_to_a1(row_count, used_columns + 1)}:'
f'{rowcol_to_a1(row_count, total_columns)}'
)
response = self._connector.values_get(
self._spreadsheet_id, sheet_range, value_render_option='FORMULA'
)
if 'values' in response and len(response['values']) > 0:
row = response['values'][0]
for index, cell in enumerate(row):
if isinstance(cell, str) and len(cell) > 0 and cell[0] == '=':
self._enqueue_batch_update({
'copyPaste': {
'source': {
'sheetId': sheet_id,
'startRowIndex': row_count - 1,
'endRowIndex': row_count,
'startColumnIndex': used_columns + index,
'endColumnIndex': used_columns + index + 1,
},
'destination': {
'sheetId': sheet_id,
'startRowIndex': row_count,
'endRowIndex': target_rows,
'startColumnIndex': used_columns + index,
'endColumnIndex': used_columns + index + 1,
},
'pasteType': 'PASTE_FORMULA',
'pasteOrientation': 'NORMAL',
}
})
#
# Add embedded charts if necessary:
#
if sheet_data.embedded_charts:
chart_column_index = len(new_headers) - 1
start_index = sheet_data.header_row_count
chart_factory = ChartDefinitionFactory(
sheet_id_map=self._sheet_id_map,
sheet_data_map=self._sheet_data_map,
column_resolvers=self._column_resolvers,
current_sheet_id=sheet_id,
current_header_rows=start_index,
current_column_resolver=column_resolver,
)
# Determine the maximum width of a chart
max_chart_width = 0
for chart in sheet_data.embedded_charts:
if chart.width > max_chart_width:
max_chart_width = chart.width
# Resize the column with charts:
self._enqueue_batch_update({
'updateDimensionProperties': {
'range': {
'sheetId': sheet_id,
'dimension': 'COLUMNS',
'startIndex': chart_column_index,
'endIndex': chart_column_index + 1,
},
'properties': {
'pixelSize': max_chart_width + 2 * self.CHART_X_MARGIN,
},
'fields': 'pixelSize',
}
})
# Add all charts
y_offset = 0
for chart in sheet_data.embedded_charts:
definition = chart_factory.create_embedded_chart_dict(
chart, start_index, chart_column_index, y_offset, self.CHART_X_MARGIN
)
self._enqueue_batch_update({
'addChart': {
'chart': definition,
}
})
y_offset += chart.height + self.CHART_Y_MARGIN
#
# Resize columns - either all, or just the columns that were added.
#
if not resize_all_cols:
for addition in column_additions:
if addition.insert_index in column_widths:
target_width = column_widths[addition.insert_index]
else:
target_width = None
if target_width is None or target_width == 0:
self._enqueue_batch_update({
'autoResizeDimensions': {
'dimensions': {
'sheetId': sheet_id,
'dimension': 'COLUMNS',
'startIndex': addition.insert_index,
'endIndex': addition.insert_index + 1,
}
}
})
else:
self._enqueue_batch_update({
'updateDimensionProperties': {
'range': {
'sheetId': sheet_id,
'dimension': 'COLUMNS',
'startIndex': addition.insert_index,
'endIndex': addition.insert_index + 1,
},
'properties': {
'pixelSize': target_width,
},
'fields': 'pixelSize',
}
})
else:
self._enqueue_batch_update({
'autoResizeDimensions': {
'dimensions': {
'sheetId': sheet_id,
'dimension': 'COLUMNS',
}
}
})
for col_index, width in enumerate(target_widths):
if width is not None:
self._enqueue_batch_update({
'updateDimensionProperties': {
'range': {
'sheetId': sheet_id,
'dimension': 'COLUMNS',
'startIndex': col_index,
'endIndex': col_index + 1,
},
'properties': {
'pixelSize': width,
},
'fields': 'pixelSize',
}
})
#
# Resize rows - will be always performed.
#
if sheet_data.row_height and target_rows > sheet_data.header_row_count:
self._enqueue_batch_update({
'updateDimensionProperties': {
'range': {
'sheetId': sheet_id,
'dimension': 'ROWS',
'startIndex': sheet_data.header_row_count,
},
'properties': {
'pixelSize': sheet_data.row_height,
},
'fields': 'pixelSize',
}
})
self._flush_batch_updates()
except HttpError as exc:
GoogleSpreadsheetError.raise_from_http_error(exc)
def create_or_update_sheet_properties(self, new_headers: list, sheet: SheetConfiguration, target_rows: int):
"""
Update an already existing sheet (potentially overwriting the default one), or create a new sheet if this
is a new sheet in the report.
:param new_headers: Table headers.
:param sheet: Sheet configuration.
:param target_rows: Total number of rows in the table.
:return: Properties of the new sheet.
"""
sheet_data = sheet.data
column_additions: list[ItemAddition] = []
should_set_filter = sheet_data.create_filter
if not sheet.use_default_sheet:
#
# Use and update existing sheet if present.
#
if sheet.previous_name in self._retrieved_sheets:
retrieved_sheet = self._retrieved_sheets[sheet.previous_name]
sheet_id = retrieved_sheet.id
row_count = retrieved_sheet.row_count
used_columns = len(new_headers)
#
# Delete all conditional rules.
#
for _ in range(0, retrieved_sheet.cond_rule_count):
self._enqueue_batch_update({
'deleteConditionalFormatRule': {
'sheetId': sheet_id,
'index': 0,
}
})
#
# Delete all charts
#
for chart_id in retrieved_sheet.chart_ids:
self._enqueue_batch_update({
'deleteEmbeddedObject': {
'objectId': chart_id,
}
})
#
# Unmerge all cells.
#
self._enqueue_batch_update({
'unmergeCells': {
'range': {
'sheetId': sheet_id,
}
}
})
update_props = {}
#
# Rename the sheet if necessary.
#
if sheet.previous_name != sheet_data.sheet_name:
update_props['title'] = sheet_data.sheet_name
#
# Reset the frozen columns/rows if the number of dims is less than the current number of frozen dims.
#
reset_all_frozen = False
if (retrieved_sheet.frozen_rows is not None and retrieved_sheet.frozen_rows >= target_rows) or (
retrieved_sheet.frozen_columns is not None and retrieved_sheet.frozen_columns >= used_columns
):
update_props['gridProperties'] = {
'frozenRowCount': 0,
'frozenColumnCount': 0,
}
reset_all_frozen = True
if update_props:
self._enqueue_batch_update({
'updateSheetProperties': {
'properties': {'sheetId': sheet_id, **update_props},
'fields': ','.join(get_dict_value_paths(update_props)),
}
})
#
# Compute column deletions and additions.
#
resize_all_cols = False
if sheet.previous_state and 'headers' in sheet.previous_state:
column_deletions, column_additions = compute_list_changes(
new_headers, sheet.previous_state['headers']
)
total_columns = retrieved_sheet.col_count - len(column_deletions) + len(column_additions)
else:
column_deletions: list[ItemDeletion] = []
column_additions: list[ItemAddition] = []
if retrieved_sheet.col_count < len(new_headers):
for idx in range(retrieved_sheet.col_count, len(new_headers)):
column_additions.append(ItemAddition(idx, idx))
total_columns = used_columns
# Remove all frozen columns before deleting any columns to prevent
# the "Sorry, it is not possible to delete all non-frozen columns" error
should_reset_frozen_cols = (
column_deletions
and not reset_all_frozen
and retrieved_sheet.frozen_columns is not None
and retrieved_sheet.frozen_columns > 0
)
if should_reset_frozen_cols:
self._enqueue_batch_update({
'updateSheetProperties': {
'properties': {
'sheetId': sheet_id,
'gridProperties': {
'frozenColumnCount': 0,
},
},
'fields': 'gridProperties.frozenColumnCount',
}
})
#
# Delete columns.
#
for deletion in column_deletions:
self._enqueue_batch_update({
'deleteDimension': {
'range': {
'sheetId': sheet_id,
'dimension': 'COLUMNS',
'startIndex': deletion.deletion_index,
'endIndex': deletion.deletion_index + 1,
}
}
})
#
# Add columns.
#
for addition in column_additions:
self._enqueue_batch_update({
'insertDimension': {
'range': {
'sheetId': sheet_id,
'dimension': 'COLUMNS',
'startIndex': addition.insert_index,
'endIndex': addition.insert_index + 1,
},
'inheritFromBefore': (addition.insert_index > 0),
}
})
# Restore previously un-frozen columns.
if should_reset_frozen_cols:
self._enqueue_batch_update({
'updateSheetProperties': {
'properties': {
'sheetId': sheet_id,
'gridProperties': {
'frozenColumnCount': retrieved_sheet.frozen_columns,
},
},
'fields': 'gridProperties.frozenColumnCount',
}
})
#
# Add or remove rows to match target number of rows.
#
if row_count < target_rows:
self._enqueue_batch_update({
'insertDimension': {
'range': {
'sheetId': sheet_id,
'dimension': 'ROWS',
'startIndex': row_count,
'endIndex': target_rows,
},
'inheritFromBefore': (row_count > 0),
}
})
elif row_count > target_rows:
# Keep at least 1 blank row: fixes the error with deleting all but frozen rows.
if target_rows <= sheet_data.header_row_count:
start_index = sheet_data.header_row_count + 1
else:
start_index = target_rows
# Delete rows, but only if necessary, possibly keeping 1 empty row.
if row_count > start_index:
self._enqueue_batch_update({
'deleteDimension': {
'range': {
'sheetId': sheet_id,
'dimension': 'ROWS',
'startIndex': start_index,
'endIndex': row_count,
}
}
})
# Clear all blank rows, if any, but try to keep the formulas.
if start_index > target_rows:
self._enqueue_batch_update({
'repeatCell': {
'range': {
'sheetId': sheet_id,
'startRowIndex': target_rows,
'endRowIndex': start_index,
'startColumnIndex': 0,
'endColumnIndex': used_columns,
},
'cell': {
'userEnteredValue': {'stringValue': ''},
},
'fields': 'userEnteredValue',
}
})
# Update filter views
if sheet_data.create_filter and retrieved_sheet.basic_filter:
filter_view = retrieved_sheet.basic_filter
should_set_filter = (
(filter_view.start_row_index != (sheet_data.header_row_count - 1))
or (filter_view.end_row_index is not None and filter_view.end_row_index < target_rows)
or (filter_view.start_column_index is not None and filter_view.start_column_index != 0)
or (filter_view.end_column_index is not None and filter_view.end_column_index < used_columns)
)
else:
#
# Create a new blank sheet.
#
sheet_id = self.allocate_sheet_id()
grid_props = self.create_new_grid_properties(sheet_data, new_headers)
self._enqueue_batch_update({
'addSheet': {
'properties': {
'sheetId': sheet_id,
'title': sheet_data.sheet_name,
'gridProperties': grid_props,
}
}
})
resize_all_cols = True
total_columns = used_columns = grid_props['columnCount']
row_count = target_rows
else:
#
# Overwrite default sheet.
#
sheet_id = self._default_sheet_id
grid_props = self.create_new_grid_properties(sheet_data, new_headers)
self._enqueue_batch_update({
'updateSheetProperties': {
'properties': {
'sheetId': sheet_id,
'title': sheet_data.sheet_name,
'gridProperties': grid_props,
},
'fields': 'title, gridProperties',
}
})
resize_all_cols = True
total_columns = used_columns = grid_props['columnCount']
row_count = target_rows
if should_set_filter:
self._enqueue_batch_update({
'setBasicFilter': {
'filter': {
'range': {
'sheetId': sheet_id,
'startRowIndex': sheet_data.header_row_count - 1,
'startColumnIndex': 0,
'endColumnIndex': used_columns,
},
}
}
})
return sheet_id, row_count, total_columns, used_columns, column_additions, resize_all_cols
def prepare_sheets(self, sheets: list[SheetData]) -> list[SheetConfiguration]:
"""
Prepare sheets before the write process:
* Load previous state and name for each sheet.
* Determine whether the sheet should overwrite the default sheet.
:param sheets: Input sheets.
:return: Sheet configuration.
"""
prepared_sheets: list[SheetConfiguration] = []
# # Find and load previous sheet state for each sheet.
for sheet_data in sheets:
previous_name = sheet_data.sheet_name
prev_sheet_state = None
if sheet_data.sheet_id:
for key, sheet_state in self._previous_sheets.items():
if 'id' in sheet_state and sheet_state['id'] == sheet_data.sheet_id:
previous_name = key
prev_sheet_state = sheet_state
if prev_sheet_state is None:
if sheet_data.sheet_name in self._previous_sheets:
prev_sheet_state = self._previous_sheets[sheet_data.sheet_name]
if not self._overwrite_default_sheet:
use_default_sheet = False
# Mark sheet as used:
if previous_name in self._retrieved_sheets:
self._unused_sheets.discard(previous_name)
else:
self._unused_sheets.discard(sheet_data.sheet_name)
else:
use_default_sheet = True
self._overwrite_default_sheet = False
self._unused_sheets.discard(sheet_data.sheet_name)
prepared_sheets.append(SheetConfiguration(sheet_data, previous_name, prev_sheet_state, use_default_sheet))
return prepared_sheets
def finalize(self, task_ctx: 'task_context.TaskContextInterface', exc: BaseException = None):
"""
Deletes all sheets that were not used during the output. Only sheets that were previously created by the same
report will be deleted.
:param task_ctx: Task context.
:param exc: Exception.
"""
if exc is None:
delete_requests = []
for sheet_name in self._unused_sheets:
if sheet_name in self._retrieved_sheets:
delete_requests.append({
'deleteSheet': {
'sheetId': self._retrieved_sheets[sheet_name].id,
}
})
try:
self._execute_batch_update(delete_requests)
except HttpError:
# Ignore any HTTP errors, as this is a non-critical operation.
pass
@staticmethod
def get_formula_separators(sheet_locale: babel.Locale = None) -> FormulaSeparators:
if sheet_locale:
decimal_symbol = sheet_locale.number_symbols.get('latn', {}).get('decimal', '.')
if decimal_symbol in LOCALIZED_FORMULA_SEPARATORS:
return LOCALIZED_FORMULA_SEPARATORS[decimal_symbol]
return LOCALIZED_FORMULA_SEPARATORS['.']
@staticmethod
def extract_sheet_properties(sheet: dict[str, Any]) -> RetrievedSheet:
if 'conditionalFormats' in sheet:
cond_formats = len(sheet['conditionalFormats'])
else:
cond_formats = 0
if 'gridProperties' in sheet['properties']:
column_count = sheet['properties']['gridProperties']['columnCount']
row_count = sheet['properties']['gridProperties']['rowCount']
frozen_rows = sheet['properties']['gridProperties'].get('frozenRowCount', None)
frozen_columns = sheet['properties']['gridProperties'].get('frozenColumnCount', None)
else:
column_count = 0
row_count = 0
frozen_rows = None
frozen_columns = None
props = RetrievedSheet(
sheet['properties']['sheetId'],
cond_formats,
column_count,
row_count,
frozen_rows=frozen_rows,
frozen_columns=frozen_columns,
)
if 'basicFilter' in sheet:
props.basic_filter = FilterView(
id=None,
start_row_index=sheet['basicFilter']['range'].get('startRowIndex', None),
end_row_index=sheet['basicFilter']['range'].get('endRowIndex', None),
start_column_index=sheet['basicFilter']['range'].get('startColumnIndex', None),
end_column_index=sheet['basicFilter']['range'].get('endColumnIndex', None),
)
if 'charts' in sheet:
for chart in sheet['charts']:
props.chart_ids.append(chart['chartId'])
return props
@staticmethod
def create_new_grid_properties(sheet_data: SheetData, new_headers: list = None):
if new_headers:
column_count = len(new_headers)
else:
column_count = len(sheet_data.row_columns) if sheet_data.row_columns else len(sheet_data.data[0])
row_count = max(len(sheet_data.data), sheet_data.header_row_count + 1)
props = {'rowCount': row_count, 'columnCount': max(column_count, 1), 'hideGridlines': False}
if sheet_data.freeze_header and sheet_data.header_row_count < len(sheet_data.data):
props['frozenRowCount'] = sheet_data.header_row_count
if sheet_data.freeze_columns:
props['frozenColumnCount'] = min(sheet_data.freeze_columns, column_count - 1)
return props
@staticmethod
def create_format(style: CellStyle):
if style is not None and not style.empty:
return style.to_dict()
else:
return {}
@staticmethod
def format_value(value, locale: babel.Locale = None, row_columns: list[Column] = None, header_size: int = None):
value = str(value)
# Convert column references in column.
if len(value) > 0 and value[0] == '=':
return expand_column_references(value, row_columns, header_size + 1)
# Convert decimal separator in numbers.
if locale is not None:
symbol = locale.number_symbols.get('latn', {}).get('decimal', '.')
if symbol != '.':
value = value.replace('.', symbol)
return value
@classmethod
def create_format_rule(
cls, format_rule, sheet_locale: babel.Locale = None, row_columns: list[Column] = None, header_size: int = None
):
if isinstance(format_rule, BoolConditionRule):
if isinstance(format_rule.value, list):
values = [
{'userEnteredValue': cls.format_value(value, sheet_locale, row_columns, header_size)}
for value in format_rule.value
]
elif format_rule.value is not None:
values = [
{'userEnteredValue': cls.format_value(format_rule.value, sheet_locale, row_columns, header_size)}
]
else:
values = []
return {
'booleanRule': {
'condition': {
'type': cls.CONDITION_TYPES.get(format_rule.op),
'values': values,
},
'format': format_rule.style.to_dict(),
}
}
elif isinstance(format_rule, GradientRule):
def create_gradient_point(point: GradientPoint):
return {
'color': point.color.to_normalized_dict(),
'type': point.value_type.name,
'value': cls.format_value(point.value, sheet_locale) if point.value is not None else '',
}
return {
'gradientRule': {
'minpoint': create_gradient_point(format_rule.point_min),
'midpoint': create_gradient_point(format_rule.point_mid),
'maxpoint': create_gradient_point(format_rule.point_max),
}
}
else:
raise ValueError(f'Unsupported rule type: {type(format_rule)}.')
@staticmethod
def create_text_format_runs(text: FormattedText):
offset = 0
runs = []
default_fmt = {}
for node in text:
if isinstance(node, FormattedTextNode):
fmt = node.text_format.to_dict()
else:
fmt = default_fmt
runs.append({
'startIndex': offset,
'format': fmt,
})
offset += len(node)
return runs
@staticmethod
def format_str(value: str):
escaped = value.replace('"', '" & CHAR(34) & "')
return f'"{escaped}"'
@classmethod
def formula_value(cls, value, sheet_locale: babel.Locale):
if isinstance(value, str):
return cls.format_str(value)
else:
return cls.format_value(value, sheet_locale)
@classmethod
def create_sparkline_formula(cls, sparkline: Sparkline, locale: babel.Locale, separators: FormulaSeparators):
if len(sparkline.values) < 2:
return '', 20
values = separators.rows.join(map(cls.format_value, sparkline.values, itertools.repeat(locale)))
values = f'{{{values}}}'
opt_dict = cls.create_sparkline_options(sparkline.options, sparkline.color_override)
options = separators.rows.join(
f'{cls.format_str(k)}{separators.cells}{cls.formula_value(v, locale)}' for k, v in opt_dict.items()
)
options = f'{{{options}}}'
if sparkline.options.type == SparklineType.LINE:
width = 10 * (len(sparkline.values) - 1)
elif sparkline.options.type == SparklineType.COLUMN:
width = 8 * len(sparkline.values)
else:
width = 60
return f'=SPARKLINE({values}{separators.parameters}{options})', width
@classmethod
def create_sparkline_options(cls, options: SparklineOptions, color_override: Color = None):
opt_dict = {'charttype': cls.SPARKLINE_TYPES[options.type]}
if options.y_min is not None:
opt_dict['ymin'] = options.y_min
if options.y_max is not None:
opt_dict['ymax'] = options.y_max
if options.line_width is not None:
opt_dict['linewidth'] = round(options.line_width)
if color_override is not None:
opt_dict['color'] = color_override.to_hex_str()
elif options.color is not None:
opt_dict['color'] = options.color.to_hex_str()
if options.negative_color is not None:
opt_dict['negcolor'] = options.negative_color.to_hex_str()
if options.low_color is not None:
opt_dict['lowcolor'] = options.low_color.to_hex_str()
if options.high_color is not None:
opt_dict['highcolor'] = options.high_color.to_hex_str()
return opt_dict
def text_width_to_px(self, text_width):
return round(text_width * self.AVG_CHAR_WIDTH)
def allocate_sheet_id(self) -> int:
"""
Gets a random sheet ID that is not already in use.
:return: Unique sheet ID.
"""
if self._sheet_id_set is None:
self._sheet_id_set = set(sheet for sheet in self._retrieved_sheets)
sheet_id = random.randint(0, self.MAX_SHEET_ID)
while sheet_id in self._sheet_id_set:
sheet_id = random.randint(0, self.MAX_SHEET_ID)
self._sheet_id_set.add(sheet_id)
return sheet_id
def _enqueue_batch_update(self, request: dict[str, Any]):
"""
Adds the given batch update request to the queue. All queued requests are executed by calling
:py:meth:`flush_batch_updates`.
:param request: Request dictionary to add. See
https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/request
"""
self._requests.append(request)
def _flush_batch_updates(self):
"""
Executes all queued batch updates and clears the current queue.
"""
self._execute_batch_update(self._requests)
self._requests.clear()
def _execute_batch_update(self, requests: list[dict[str, Any]]):
"""
Excutes the given batchUpdate requests.
:param requests: List of requess.
:return: Response from the batchUpdate call.
"""
if requests:
return self._connector.batch_update(
self._spreadsheet_id,
body={
'requests': requests,
},
)
else:
return None