Перетяжка, Премия ТПрогер, 13.11
Перетяжка, Премия ТПрогер, 13.11
Перетяжка, Премия ТПрогер, 13.11

Устраиваем свой Data QA с PyTest и фикстурами

В статье показан базовый, но рабочий подход к тестированию трансформации данных в ETL-процессах. Автор, на примере Python-проекта с pytest, allure и psycopg2, демонстрирует, как автоматизировать создание и наполнение таблиц, хранить схемы и данные в JSON, поднимать фикстуры для работы с БД и API, а затем сравнивать результат. Всё это — с живым стилем, юмором и кучей кода.

651 открытий6К показов
Устраиваем свой Data QA с PyTest и фикстурами

Как тестировать перенос и трансформацию данных без боли

Я всё думаю над тем, о чём говорить на своём канале. Казалось бы, инфы в интернете — вагон и маленькая тележка: бери, читай, вдохновляйся. Но вот что удивительно — про то, как тестировать данные, как они перетекают из одной таблицы в другую и не превращаются в тыкву, пишут мало.

А зря.

За свою практику я встречала разные подходы к организации Data QA. В этой статье я расскажу про самый простой и быстрый вариант, который можно «собрать на коленке», а потом уже развивать. С примерами кода и тем, зачем SQL нам нужен в этой жизни.

И небольшой дисклеймер. Если вы знаете, как сделать это на Python стильно модно молодежно, обязательно напишите мне об этом в комментариях, я с удовольствием почитаю про ваш подход.

Также подписывайтесь на мой ТГ, который я почти не веду.

Базовый сценарий

  1. Есть таблица-источник source_table.
  2. Разработчик запускает свой код (Spark, Airflow или что у него там), и данные магическим образом перекладываются + преобразуются.
  3. На выходе получаем result_table.

Вопрос: как это тестировать?

Самый быстрый и простой путь

Подготовка:

  1. Создаём таблицу-источник.
  2. Наполняем её данными (синтетика или реальные примеры с прода).

Действие:

  1. Запускаем ETL код.
  2. Сравниваем данные «как было» и «как стало».

После теста:

  1. Удаляем таблицу-источник.
  2. Удаляем таблицу-приёмник.

На коленке это можно собрать так:

  1. CREATE TABLE …
  2. INSERT INTO …
  3. Ручной запуск пайплайна (Spark/Airflow).
  4. Какой-нибудь скрипт для сравнения результатов.

Рабочая схема? Да.

Удобная? Нет.

Где хранить эти скрипты?

Можно, конечно, прилепить их к задаче в Jira в виде файликов. Но выглядеть это будет как «фигня на палке». Через месяц никто не вспомнит, что это было и зачем.

Поэтому я решила собрать свой Python-проект для автоматизации. А вы можете склепать себе такой же, и жизнь сразу станет легче.

Архитектура проекта

			tests/
├── api/                      # Тесты API
├── database/                 # Тесты БД
├── utils/                    # Хелперы
│   ├── api_helper.py         
│   ├── assert_helper.py      
│   ├── postgre_sql_helper.py 
│   └── utils.py              
└── conftest.py               # Общие фикстуры
		

Минимальный стек

			pytest   # для запуска тестов
allure   # для красивых отчётов
psycopg2 # для работы с PostgreSQL (или вашей БД)
requests # для триггеров пайплайнов
		

1. Utils — мелкие, но полезные

Здесь мы храним всякую мелочёвку. Например, функцию для открытия JSON:

			def open_json_file(path: str) -> List[Dict[str, Any]]:
 """Читает и парсит JSON-файл, возвращая список словарей."""
    file_path = Path(path)

    if not file_path.exists():
        raise FileNotFoundError(f'JSON файл не найден: {file_path}')

    if not file_path.is_file():
        raise IsADirectoryError(f'Указанный путь является директорией: {file_path}')

    try:
        return json.loads(file_path.read_text(encoding='utf-8'))
    except json.JSONDecodeError as e:
        raise json.JSONDecodeError(f'Ошибка декодирования JSON: {file_path}', e.doc, e.pos) from e
		

Важно отметить, что данные я всегда стараюсь хранить в виде массива с json. Так удобнее использовать.

2. APIClient — для триггеров

Здесь всё максимально тривиально: клиент для вызова API и запуска ETL джоб.

3. PostgreSQLClient — основной игрок

Класс для подключения, запросов, создания таблиц, вставки данных и удаления.

			class PostgreSQLClient:
    """Клиент для работы с PostgreSQL с поддержкой подключения и выполнения запросов."""    

    def __init__(self,
                 host: str = 'localhost',
                 port: int = 5432,
                 dbname: str = 'postgres',
                 user: str = 'postgres',
                 password: str = '',
                 autocommit: bool = False):
        self.connection_params = {
            'host': host,
            'port': port,
            'dbname': dbname,
            'user': user,
            'password': password,
        }
        self.autocommit = autocommit
        self.conn = None
        self.cursor = None      

    def connect(self) -> None:
        """Установка соединения с базой данных."""
        try:
            extensions.adapters = {}  # Очищаем все адаптеры
            self.conn = psycopg2.connect(**self.connection_params)
            self.conn.autocommit = self.autocommit
            self.cursor = self.conn.cursor(cursor_factory=DictCursor)
        except psycopg2.Error as e:
            raise RuntimeError(f'Ошибка подключения: {e}') from e

    def disconnect(self) -> None:
        """Закрытие соединения."""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()

    def execute_query(self,
                      query: Union[str, sql.Composable],
                      params: Optional[Tuple] = None,
                      fetch: bool = False) -> Optional[List[Dict[str, Any]]]:
        """Выполнение SQL-запроса."""
        try:
            self.cursor.execute(query, params)
            self.conn.commit()
            if fetch:
                return self.cursor.fetchall()
        except psycopg2.Error as e:
            self.conn.rollback()
            raise RuntimeError(f'Ошибка выполнения запроса: {e}\nЗапрос: {query}') from e
        return None
		

Фикстуры (conftest.py)

API фикстуры

Я показываю пример, как поднимаю один API клиент. Но у вас их может быть много: например, разные сервисы или разные перекладчики.

			@pytest.fixture(scope='session')
def api_config():
    config = {
        'base_url': os.getenv('API_BASE_URL'),
        'username': os.getenv('API_USERNAME'),
        'password': os.getenv('API_PASSWORD'),
    }
    if not all(config.values()):
        missing = [k for k, v in config.items() if not v]
        pytest.fail(f'Не заданы переменные окружения: {missing}')
    return config


@pytest.fixture(scope='session')
def api_client(api_config):
    client = APIClient(base_url=api_config['base_url'])
    client.authenticate(
        username=api_config['username'],
        password=api_config['password'],
    )
    return client
		

DB фикстура

Пример для дефолтной PostgreSQL, но вы можете добавить фикстуры для поднятия GreenPlum или ClickHouse.

			@pytest.fixture(scope='session')
def postgres_client() -> Generator[PostgreSQLClient, None, None]:
    client = PostgreSQLClient(
        host=os.getenv('PG_HOST'),
        dbname=os.getenv('PG_DB_NAME'),
        user=os.getenv('PG_DB_USER'),
        password=os.getenv('PG_DB_PASSWORD'))
    client.connect()
    yield client
    client.disconnect()
		

Как хранить данные?

А теперь самый сок, которым я обмазываюсь сейчас.

Конечно, можно создать отдельный скрипт на чистом sql для создания и редактирования своих табличек: больших и маленьких. Но, если честно, это так *цензура* неудобно.

Поэтому я придумала хранить схему и данные в отдельных json файлах и налету генерить скрипты. Да, Америку я вновь не открываю, и очень жаль.

В директории тестов я создала папку resources, где храню эти файлы.

			[
  {
    "column_name": "col_integer",
    "data_type": "INTEGER",
    "is_nullable": "YES",
    "column_default": null  
  }
]
		
			[
  {
    "col_integer": 180
  }
]
		

Конечно, в таком виде вы не передадите ничего в Postgres и ничего не слепите. Так что я услужливо предоставляю вам код, чтобы вы его копировали и вдохновлялись.

			def create_table_from_columns(
            self,
            table: str,
            columns: list[dict[str, Any]],
            if_not_exists: bool = True,
    ) -> None:

        if not columns:
            raise ValueError("columns must be a non-empty list of column specs")

        fragments = [sql.SQL("CREATE TABLE")]
        if if_not_exists:
            fragments.append(sql.SQL("IF NOT EXISTS"))
        fragments.append(self._ident_from_dotpath(table))

        col_defs: list[sql.SQL] = []
        for col in columns:
            name = col["column_name"]
            data_type = col["data_type"]
            is_nullable = (str(col.get("is_nullable", "YES")).upper() != "NO")
            default = col.get("column_default", None)
            constraint = col.get("constraint", None)

            parts: list[sql.SQL] = [
                sql.Identifier(name),
                sql.SQL(data_type),
            ]

            if not is_nullable:
                parts.append(sql.SQL("NOT NULL"))

            if default is not None:
                parts.append(sql.SQL("DEFAULT"))
                parts.append(sql.SQL(str(default)))

            if constraint:
                parts.append(sql.SQL(constraint))

            col_defs.append(sql.SQL(" ").join(parts))

        query = sql.SQL(" ").join(fragments) + sql.SQL(" ({})").format(sql.SQL(", ").join(col_defs))
        self.execute_query(query)

    def drop_table(self, table: str, *, cascade: bool = True, if_exists: bool = True) -> None:
        """
        Удаляет таблицу.

        :param table: Имя таблицы (можно 'schema.table' или просто 'table')
        :param cascade: Добавить CASCADE (по умолчанию True)
        :param if_exists: Добавить IF EXISTS, чтобы не падать, если таблицы нет (по умолчанию True)
        """
        fragments = [sql.SQL("DROP TABLE")]
        if if_exists:
            fragments.append(sql.SQL("IF EXISTS"))
        fragments.append(self._ident_from_dotpath(table))
        if cascade:
            fragments.append(sql.SQL("CASCADE"))

        query = sql.SQL(" ").join(fragments) + sql.SQL(";")
        self.execute_query(query)

    def insert_data(self, table: str, data: dict[str, Any]) -> None:
        """
        Вставка данных в таблицу.

        :param table: Имя таблицы
        :param data: Данные для вставки {column: value}
        """
        columns = sql.SQL(', ').join(map(sql.Identifier, data.keys()))
        values = sql.SQL(', ').join(sql.Placeholder() * len(data))
        query = sql.SQL('INSERT INTO {} ({}) VALUES ({})').format(
            self._ident_from_dotpath(table),
            columns,
            values,
        )
        self.execute_query(query, tuple(data.values()))
		

Вы можете взять и пропихнуть эти куски кода прямо в тело теста, но я предпочитаю оборачивать это дело в фикстуры-управленцы, которые будут делать это за меня и записывать отдельными шагами в Алюр:

			@pytest.fixture
def create_table(request):
    def _create(db_client:PostgreSQLClient, columns: list[dict[str, Any]], table_name: str):
        with allure.step(f"Создание таблицы {table_name}"):
            db_client.create_table_from_columns(table=table_name, columns=columns,)
        return table_name

    return _create

@pytest.fixture
def drop_table(request):
    def _drop(db_client:PostgreSQLClient, table_name: str):
        with allure.step(f"Удаление таблицы {table_name}"):
            db_client.drop_table(table=table_name)
        return table_name
    return _drop

@pytest.fixture(scope="session")
def insert_table_data():
    def _insert(db_client: PostgreSQLClient, table_name: str, data: list[dict[str, Any]]):
        total = len(data)
        success = 0
        warnings: list[str] = []

        with allure.step(f"Добавление данных в таблицу {table_name}"):
            for d in data:
                try:
                    db_client.insert_data(table=table_name, data=d)
                    success += 1
                except Exception as e:
                    msg = f"⚠️ Пропущена строка {d} из-за ошибки: {e}"
                    warnings.append(msg)
                    allure.attach(msg, name="Warning", attachment_type=allure.attachment_type.TEXT)
                    logger.warning(msg)

            summary = f"Успешно добавлено: {success} из {total}"
            if warnings:
                tail = warnings[:50]
                if len(warnings) > 50:
                    tail.append(f"... и ещё {len(warnings) - 50} предупреждений")
                summary += "\n\nWarnings:\n" + "\n".join(tail)

            allure.attach(
                summary,
                name=f"Результат вставки в {table_name}",
                attachment_type=allure.attachment_type.TEXT
            )

        return {"success": success, "total": total, "warnings": warnings}

    return _insert
		

Отдельное удовольствие мне даёт вот эта строчка кода db_client:PostgreSQLClient.Она значит, что если так вышло, что у меня есть куча разных PostgesSql, я могу в эту фикстуру прокинуть нужный мне клиент без дополнительных приседаний. И это, конечно, прекрасно.

Пример: подготовка данных

			@pytest.fixture
def prepare_db(greenplum_client, create_table, insert_table_data, drop_table):
    # Создаём таблицу
    schema = open_json_file('resources/schema.json')
    table_name = f'public.qa_test_{generate_lowercase_name()}'
    create_table(greenplum_client, table_name, schema)

    # Вставляем данные
    data = open_json_file('resources/data.json')
    insert_table_data(greenplum_client, table_name, data)

    yield {"table_name": table_name, "data": data}

    # Чистим за собой
    drop_table(greenplum_client, table_name)
		

Самые внимательные уже заметили, что в примере выше я поднимала postgres-клиента.

Так что разницы почти никакой (ну, кроме тех случаев, когда внезапно очень даже есть 🙃).

И сам тест

			def test_materialisation_with_valid_data(
        prepare_db,
        meta_postgres_client,
        data_gate_api_client,
):
    source_table = prepare_db['table_name']
    expected = prepare_db['data']

    # Запуск пайплайна
    data_gate_api_client.trigger_job(source_table)

    # Читаем результат
    actual = meta_postgres_client.execute_query(
        f"SELECT * FROM result_table WHERE source='{source_table}'",
        fetch=True
    )

    # Сравниваем
    with allure.step('Сравнение результатов'):
        assert_equal(expected, actual)
		

Конечно, можно сравнивать всё это дело через всякие прослойки типа Pandas.

Но, камон, у меня что, куча времени сидеть, читать доки и писать суперсложные датафреймы ради пары простых проверок? Нет.

Позвольте мне радостно строчить свой первозданный код с дефолтными проверками и быть довольной этим минимализмом.

Это я отвлеклась. Вот такой код у меня получается. Я не очень люблю сверять даты и названия таблиц, у меня генерит тест на лету:

			# Регулярка для ISO-датавремени (подстроки)
_TIMESTAMP_RE = re.compile(r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?')
_DYNAMIC_TABLE_RE = re.compile(r'\b[a-z_][a-z0-9_]*\.qa_test_table_[a-z0-9]+\b')

def _clean_line(s: str) -> str:
    s = _TIMESTAMP_RE.sub('', s)
    s = _DYNAMIC_TABLE_RE.sub('SCHEMA.materialisation_table_*', s)
    return s

def assert_equal(expected: str, actual: str) -> None:
    """
    Сравнивает expected и actual построчно.
    Перед сравнением:
      - удаляет ISO-датавремя,
      - нормализует динамические имена таблиц вида schema.materialisation_table_<рандом>.
    Собирает все расхождения и в конце делает pytest.fail, если они есть.
    """
    exp_lines = expected.splitlines()
    act_lines = actual.splitlines()
    errors = []
    max_lines = max(len(exp_lines), len(act_lines))

    for idx in range(max_lines):
        line_no = idx + 1
        el = exp_lines[idx] if idx < len(exp_lines) else None
        al = act_lines[idx] if idx < len(act_lines) else None

        if el is not None and al is not None:
            cleaned_el = _clean_line(el)
            cleaned_al = _clean_line(al)
            if cleaned_el == cleaned_al:
                continue
            display_el, display_al = cleaned_el, cleaned_al
        else:
            display_el, display_al = el, al

        if el != al:
            if el is None:
                errors.append(f"Line {line_no}: unexpected extra actual line: '{al}'")
            elif al is None:
                errors.append(f"Line {line_no}: missing actual line, expected '{el}'")
            else:
                errors.append(f"Line {line_no}: expected '{display_el}', got '{display_al}'")

    if errors:
        pytest.fail('Messages differ:\n' + '\n'.join(errors))
		

Итог

  • Ручные скрипты → боль и страдания.
  • Автоматизация через Python+pytest → быстро, удобно, воспроизводимо.
  • JSON-файлы позволяют хранить схемы и данные в читаемом виде.
  • Allure даёт красивые отчёты.

В итоге у вас появляется настоящий фреймворк для Data QA, который легко расширять и масштабировать.

Следите за новыми постами
Следите за новыми постами по любимым темам
651 открытий6К показов