В статье показан базовый, но рабочий подход к тестированию трансформации данных в ETL-процессах. Автор, на примере Python-проекта с pytest, allure и psycopg2, демонстрирует, как автоматизировать создание и наполнение таблиц, хранить схемы и данные в JSON, поднимать фикстуры для работы с БД и API, а затем сравнивать результат. Всё это — с живым стилем, юмором и кучей кода.
651 открытий6К показов
Как тестировать перенос и трансформацию данных без боли
Я всё думаю над тем, о чём говорить на своём канале. Казалось бы, инфы в интернете — вагон и маленькая тележка: бери, читай, вдохновляйся. Но вот что удивительно — про то, как тестировать данные, как они перетекают из одной таблицы в другую и не превращаются в тыкву, пишут мало.
А зря.
За свою практику я встречала разные подходы к организации Data QA. В этой статье я расскажу про самый простой и быстрый вариант, который можно «собрать на коленке», а потом уже развивать. С примерами кода и тем, зачем SQL нам нужен в этой жизни.
И небольшой дисклеймер. Если вы знаете, как сделать это на Python стильно модно молодежно, обязательно напишите мне об этом в комментариях, я с удовольствием почитаю про ваш подход.
Также подписывайтесь на мой ТГ, который я почти не веду.
Разработчик запускает свой код (Spark, Airflow или что у него там), и данные магическим образом перекладываются + преобразуются.
На выходе получаем result_table.
Вопрос: как это тестировать?
Самый быстрый и простой путь
Подготовка:
Создаём таблицу-источник.
Наполняем её данными (синтетика или реальные примеры с прода).
Действие:
Запускаем ETL код.
Сравниваем данные «как было» и «как стало».
После теста:
Удаляем таблицу-источник.
Удаляем таблицу-приёмник.
На коленке это можно собрать так:
CREATE TABLE …
INSERT INTO …
Ручной запуск пайплайна (Spark/Airflow).
Какой-нибудь скрипт для сравнения результатов.
Рабочая схема? Да.
Удобная? Нет.
Где хранить эти скрипты?
Можно, конечно, прилепить их к задаче в Jira в виде файликов. Но выглядеть это будет как «фигня на палке». Через месяц никто не вспомнит, что это было и зачем.
Поэтому я решила собрать свой Python-проект для автоматизации. А вы можете склепать себе такой же, и жизнь сразу станет легче.
pytest # для запуска тестов
allure # для красивых отчётов
psycopg2 # для работы с PostgreSQL (или вашей БД)
requests # для триггеров пайплайнов
1. Utils — мелкие, но полезные
Здесь мы храним всякую мелочёвку. Например, функцию для открытия JSON:
def open_json_file(path: str) -> List[Dict[str, Any]]:
"""Читает и парсит JSON-файл, возвращая список словарей."""
file_path = Path(path)
if not file_path.exists():
raise FileNotFoundError(f'JSON файл не найден: {file_path}')
if not file_path.is_file():
raise IsADirectoryError(f'Указанный путь является директорией: {file_path}')
try:
return json.loads(file_path.read_text(encoding='utf-8'))
except json.JSONDecodeError as e:
raise json.JSONDecodeError(f'Ошибка декодирования JSON: {file_path}', e.doc, e.pos) from e
Важно отметить, что данные я всегда стараюсь хранить в виде массива с json. Так удобнее использовать.
2. APIClient — для триггеров
Здесь всё максимально тривиально: клиент для вызова API и запуска ETL джоб.
3. PostgreSQLClient — основной игрок
Класс для подключения, запросов, создания таблиц, вставки данных и удаления.
class PostgreSQLClient:
"""Клиент для работы с PostgreSQL с поддержкой подключения и выполнения запросов."""
def __init__(self,
host: str = 'localhost',
port: int = 5432,
dbname: str = 'postgres',
user: str = 'postgres',
password: str = '',
autocommit: bool = False):
self.connection_params = {
'host': host,
'port': port,
'dbname': dbname,
'user': user,
'password': password,
}
self.autocommit = autocommit
self.conn = None
self.cursor = None
def connect(self) -> None:
"""Установка соединения с базой данных."""
try:
extensions.adapters = {} # Очищаем все адаптеры
self.conn = psycopg2.connect(**self.connection_params)
self.conn.autocommit = self.autocommit
self.cursor = self.conn.cursor(cursor_factory=DictCursor)
except psycopg2.Error as e:
raise RuntimeError(f'Ошибка подключения: {e}') from e
def disconnect(self) -> None:
"""Закрытие соединения."""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
def execute_query(self,
query: Union[str, sql.Composable],
params: Optional[Tuple] = None,
fetch: bool = False) -> Optional[List[Dict[str, Any]]]:
"""Выполнение SQL-запроса."""
try:
self.cursor.execute(query, params)
self.conn.commit()
if fetch:
return self.cursor.fetchall()
except psycopg2.Error as e:
self.conn.rollback()
raise RuntimeError(f'Ошибка выполнения запроса: {e}\nЗапрос: {query}') from e
return None
Фикстуры (conftest.py)
API фикстуры
Я показываю пример, как поднимаю один API клиент. Но у вас их может быть много: например, разные сервисы или разные перекладчики.
@pytest.fixture(scope='session')
def api_config():
config = {
'base_url': os.getenv('API_BASE_URL'),
'username': os.getenv('API_USERNAME'),
'password': os.getenv('API_PASSWORD'),
}
if not all(config.values()):
missing = [k for k, v in config.items() if not v]
pytest.fail(f'Не заданы переменные окружения: {missing}')
return config
@pytest.fixture(scope='session')
def api_client(api_config):
client = APIClient(base_url=api_config['base_url'])
client.authenticate(
username=api_config['username'],
password=api_config['password'],
)
return client
DB фикстура
Пример для дефолтной PostgreSQL, но вы можете добавить фикстуры для поднятия GreenPlum или ClickHouse.
Конечно, можно создать отдельный скрипт на чистом sql для создания и редактирования своих табличек: больших и маленьких. Но, если честно, это так *цензура* неудобно.
Поэтому я придумала хранить схему и данные в отдельных json файлах и налету генерить скрипты. Да, Америку я вновь не открываю, и очень жаль.
В директории тестов я создала папку resources, где храню эти файлы.
Конечно, в таком виде вы не передадите ничего в Postgres и ничего не слепите. Так что я услужливо предоставляю вам код, чтобы вы его копировали и вдохновлялись.
def create_table_from_columns(
self,
table: str,
columns: list[dict[str, Any]],
if_not_exists: bool = True,
) -> None:
if not columns:
raise ValueError("columns must be a non-empty list of column specs")
fragments = [sql.SQL("CREATE TABLE")]
if if_not_exists:
fragments.append(sql.SQL("IF NOT EXISTS"))
fragments.append(self._ident_from_dotpath(table))
col_defs: list[sql.SQL] = []
for col in columns:
name = col["column_name"]
data_type = col["data_type"]
is_nullable = (str(col.get("is_nullable", "YES")).upper() != "NO")
default = col.get("column_default", None)
constraint = col.get("constraint", None)
parts: list[sql.SQL] = [
sql.Identifier(name),
sql.SQL(data_type),
]
if not is_nullable:
parts.append(sql.SQL("NOT NULL"))
if default is not None:
parts.append(sql.SQL("DEFAULT"))
parts.append(sql.SQL(str(default)))
if constraint:
parts.append(sql.SQL(constraint))
col_defs.append(sql.SQL(" ").join(parts))
query = sql.SQL(" ").join(fragments) + sql.SQL(" ({})").format(sql.SQL(", ").join(col_defs))
self.execute_query(query)
def drop_table(self, table: str, *, cascade: bool = True, if_exists: bool = True) -> None:
"""
Удаляет таблицу.
:param table: Имя таблицы (можно 'schema.table' или просто 'table')
:param cascade: Добавить CASCADE (по умолчанию True)
:param if_exists: Добавить IF EXISTS, чтобы не падать, если таблицы нет (по умолчанию True)
"""
fragments = [sql.SQL("DROP TABLE")]
if if_exists:
fragments.append(sql.SQL("IF EXISTS"))
fragments.append(self._ident_from_dotpath(table))
if cascade:
fragments.append(sql.SQL("CASCADE"))
query = sql.SQL(" ").join(fragments) + sql.SQL(";")
self.execute_query(query)
def insert_data(self, table: str, data: dict[str, Any]) -> None:
"""
Вставка данных в таблицу.
:param table: Имя таблицы
:param data: Данные для вставки {column: value}
"""
columns = sql.SQL(', ').join(map(sql.Identifier, data.keys()))
values = sql.SQL(', ').join(sql.Placeholder() * len(data))
query = sql.SQL('INSERT INTO {} ({}) VALUES ({})').format(
self._ident_from_dotpath(table),
columns,
values,
)
self.execute_query(query, tuple(data.values()))
Вы можете взять и пропихнуть эти куски кода прямо в тело теста, но я предпочитаю оборачивать это дело в фикстуры-управленцы, которые будут делать это за меня и записывать отдельными шагами в Алюр:
@pytest.fixture
def create_table(request):
def _create(db_client:PostgreSQLClient, columns: list[dict[str, Any]], table_name: str):
with allure.step(f"Создание таблицы {table_name}"):
db_client.create_table_from_columns(table=table_name, columns=columns,)
return table_name
return _create
@pytest.fixture
def drop_table(request):
def _drop(db_client:PostgreSQLClient, table_name: str):
with allure.step(f"Удаление таблицы {table_name}"):
db_client.drop_table(table=table_name)
return table_name
return _drop
@pytest.fixture(scope="session")
def insert_table_data():
def _insert(db_client: PostgreSQLClient, table_name: str, data: list[dict[str, Any]]):
total = len(data)
success = 0
warnings: list[str] = []
with allure.step(f"Добавление данных в таблицу {table_name}"):
for d in data:
try:
db_client.insert_data(table=table_name, data=d)
success += 1
except Exception as e:
msg = f"⚠️ Пропущена строка {d} из-за ошибки: {e}"
warnings.append(msg)
allure.attach(msg, name="Warning", attachment_type=allure.attachment_type.TEXT)
logger.warning(msg)
summary = f"Успешно добавлено: {success} из {total}"
if warnings:
tail = warnings[:50]
if len(warnings) > 50:
tail.append(f"... и ещё {len(warnings) - 50} предупреждений")
summary += "\n\nWarnings:\n" + "\n".join(tail)
allure.attach(
summary,
name=f"Результат вставки в {table_name}",
attachment_type=allure.attachment_type.TEXT
)
return {"success": success, "total": total, "warnings": warnings}
return _insert
Отдельное удовольствие мне даёт вот эта строчка кода db_client:PostgreSQLClient.Она значит, что если так вышло, что у меня есть куча разных PostgesSql, я могу в эту фикстуру прокинуть нужный мне клиент без дополнительных приседаний. И это, конечно, прекрасно.
Пример: подготовка данных
@pytest.fixture
def prepare_db(greenplum_client, create_table, insert_table_data, drop_table):
# Создаём таблицу
schema = open_json_file('resources/schema.json')
table_name = f'public.qa_test_{generate_lowercase_name()}'
create_table(greenplum_client, table_name, schema)
# Вставляем данные
data = open_json_file('resources/data.json')
insert_table_data(greenplum_client, table_name, data)
yield {"table_name": table_name, "data": data}
# Чистим за собой
drop_table(greenplum_client, table_name)
Самые внимательные уже заметили, что в примере выше я поднимала postgres-клиента.
Так что разницы почти никакой (ну, кроме тех случаев, когда внезапно очень даже есть 🙃).
И сам тест
def test_materialisation_with_valid_data(
prepare_db,
meta_postgres_client,
data_gate_api_client,
):
source_table = prepare_db['table_name']
expected = prepare_db['data']
# Запуск пайплайна
data_gate_api_client.trigger_job(source_table)
# Читаем результат
actual = meta_postgres_client.execute_query(
f"SELECT * FROM result_table WHERE source='{source_table}'",
fetch=True
)
# Сравниваем
with allure.step('Сравнение результатов'):
assert_equal(expected, actual)
Конечно, можно сравнивать всё это дело через всякие прослойки типа Pandas.
Но, камон, у меня что, куча времени сидеть, читать доки и писать суперсложные датафреймы ради пары простых проверок? Нет.
Позвольте мне радостно строчить свой первозданный код с дефолтными проверками и быть довольной этим минимализмом.
Это я отвлеклась. Вот такой код у меня получается. Я не очень люблю сверять даты и названия таблиц, у меня генерит тест на лету:
# Регулярка для ISO-датавремени (подстроки)
_TIMESTAMP_RE = re.compile(r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?')
_DYNAMIC_TABLE_RE = re.compile(r'\b[a-z_][a-z0-9_]*\.qa_test_table_[a-z0-9]+\b')
def _clean_line(s: str) -> str:
s = _TIMESTAMP_RE.sub('', s)
s = _DYNAMIC_TABLE_RE.sub('SCHEMA.materialisation_table_*', s)
return s
def assert_equal(expected: str, actual: str) -> None:
"""
Сравнивает expected и actual построчно.
Перед сравнением:
- удаляет ISO-датавремя,
- нормализует динамические имена таблиц вида schema.materialisation_table_<рандом>.
Собирает все расхождения и в конце делает pytest.fail, если они есть.
"""
exp_lines = expected.splitlines()
act_lines = actual.splitlines()
errors = []
max_lines = max(len(exp_lines), len(act_lines))
for idx in range(max_lines):
line_no = idx + 1
el = exp_lines[idx] if idx < len(exp_lines) else None
al = act_lines[idx] if idx < len(act_lines) else None
if el is not None and al is not None:
cleaned_el = _clean_line(el)
cleaned_al = _clean_line(al)
if cleaned_el == cleaned_al:
continue
display_el, display_al = cleaned_el, cleaned_al
else:
display_el, display_al = el, al
if el != al:
if el is None:
errors.append(f"Line {line_no}: unexpected extra actual line: '{al}'")
elif al is None:
errors.append(f"Line {line_no}: missing actual line, expected '{el}'")
else:
errors.append(f"Line {line_no}: expected '{display_el}', got '{display_al}'")
if errors:
pytest.fail('Messages differ:\n' + '\n'.join(errors))
Итог
Ручные скрипты → боль и страдания.
Автоматизация через Python+pytest → быстро, удобно, воспроизводимо.
JSON-файлы позволяют хранить схемы и данные в читаемом виде.
Allure даёт красивые отчёты.
В итоге у вас появляется настоящий фреймворк для Data QA, который легко расширять и масштабировать.
Популярная Python-библиотека Ultralytics заразилась криптомайнером. Вредоносный код обнаружен в версиях 8.3.41 и 8.3.42, что привело к риску для тысяч устройств
Что такое Hexagonal Architecture. Показываем основные возможности применения гексагональной архитектуры в программировании. Рассматриваем пошаговую инструкцию и основные нюансы
История из первых рук о том, как незаметная «забытая» вкладка во время финальной проверки привела к 500-й ошибке, панике и спасению релиза в последний момент. Про усталость, стыд, самоиронию и то, как команды учатся на собственных провалах (иногда на горьком опыте).