Заполнение пустых значений датасета

На практике в реальных данных очень часто встречаются пропуски. Причинами могут быть ошибки ввода данных, сокрытие информации, фрод. При работе с данными часто возникает необходимость обработать пустые значения в исходных данных.

В Analytic Workspace данная задача может быть выполнена различными способами, рассмотрим пример трансформации через ETL редактор.


Переход в редактор осуществляется по нажатию кнопки в верхнем правом углу при настройке модели.

ETL-процесс

Перед запуском трансформации, механизмы ETL обеспечивают выгрузку данных из источников и сохранение их в виде parquet-файлов во внутреннем хранилище системы. В процессе выполнения трансформации, система загружает данные каждой из таблиц в распределенные Spark-датафреймы (DataFrame) и выполняет над ними SQL-запрос, который соединяет их в единую денормализованную аналитическую таблицу.
Схема взаимодействия при работе с моделью представлена на рисунке.


Таким образом существует несколько точек процесса через которые можно встроиться в процесс.
В примере будем убирать пустые значения после объединения всех таблиц, поэтому воспользуемся функцией after_all

Трансформация данных

Первым аргументом в функцию передается датафрейм с итоговой таблицей. Возвращать функция будет обработанный датафрейм. Полный код представлен ниже.

def after_all(df, spark, app, *args, **kwargs):
    """ """
    df_pd = df.toPandas()
    df_pd.fillna({'sum_spicano_casov': 0}, inplace=True)
    return spark.createDataFrame(df_pd)

Для удобства будем использовать возможности библиотеки Pandas. Преобразуем датафрейм Apache Spark в Pandas

df_pd = df.toPandas()

Для обработки пустых значений воспользуемся методом fillna. Первым параметром передаем объект с названиями столбцов и новыми значениями для пустых ячеек. Указываем параметр inplace=True для изменения датафрейма “на месте”. В примере для всех NaN значений в столбце ‘sum_spisano_casov’ установим 0. Таким образом можно явно указать к каким столбцам будут применены изменения.

df_pd.fillna({'sum_spicano_casov': 0}, inplace=True)

Для обработки значений во всех столбцах достаточно указать только новое значение для ячеек с NaN.

df_pd.fillna(0)

После внесенных изменений функция должна вернуть Apache Spark датафрейм. Для этого преобразуем фрейм Pandas обратно в Apache Spark.

return spark.createDataFrame(df_pd)

Синхронизация данных в модели

ETL скрипт можно протестировать на тестовых данных. После настройки необходимо опубликовать скрипт.


В модели необходимо загрузить обновленные данные в хранилище.
image