Работа с web-сервисами

При работе в системе у пользователя может возникнуть потребность загружать данные напрямую из удаленных ресурсов, а также передавать web-сервисам данные для записи, информировать через мессенджеры.

Так как работа с сервисами достаточно разносторонняя, то на данный момент было решено дать возможность работать с ними не через соединения в системе, а через редактор ETL, который позволяет реализовать разнообразную логику, оперируя нужными методами в каждой из точек расширения процесса трансформации модели данных. Подробнее здесь: Справка

Рассмотрим несколько примеров

1. Получение данных в формате JSON
2. Получение данных в формате CSV
3. Создание записи в БД удаленного web-сервиса
4. Отправка данных в чат Telegram

Примеры 1 и 2 позволяют увидеть, что обработка входящих данных различных форматов имеет свои особенности, поэтому эти пункты выделены отдельно.

1. Получение данных в формате JSON

Задача: необходимо получить данные в json формате с web-сервиса и использовать их в общем ETL процессе. Набор данных: ‘https://api.covidtracking.com/v1/us/daily.json

Алгоритм решения:

  1. Проанализируем исходный набор данных и определим заголовки столбцов:
['at', 'positive', 'hospitalizedcurrently', 'hospitalizedcumulative',
 'onventilatorcurrently', 'onventilatorcumulative']
  1. В системе создадим источник данных на основе таблицы Excel, содержащей указанные заголовки столбцов, чтобы сформировать модель данных куда данные будут записаны. В будущем будет возможность создавать виртуальные таблицы автоматически, а пока мы сделаем это вручную. Таблица пока не заполнена данными.

  2. Создадим модель данных на основе источника.

  3. Импортируем необходимые библиотеки:

import requests
import datetime
from pyspark.sql import Row
  1. Добавим функцию, по итогам выполнения которой, в таблицу источника будут записаны данные:
def after_load_covid(df, spark, app, *args, **kwargs):
   
    r = requests.get('https://api.covidtracking.com/v1/us/daily.json')
    if not r.ok:
        raise Exception(f'Ошибка GET {r.url}. HTTP {r.status_code}: {r.text}')
    
    #Создадим список, который послужит основной для создания DataFrame
    rows = []
    for line in r.json():
        rows.append(Row(
            at=datetime.datetime.strptime(str(line['date']), '%Y%m%d'),
            positive=line['positive'],
            hospitalizedcurrently=line['hospitalizedCurrently'],
            hospitalizedcumulative=line['hospitalizedCumulative'],
            onventilatorcurrently=line['onVentilatorCurrently'],
            onventilatorcumulative=line['onVentilatorCumulative'],
        ))

    return spark.createDataFrame(rows)

В итоге, данные успешно записаны в DataFrame:

2. Получение данных в формате CSV

Задача: необходимо получить данные в csv формате с web-сервиса и использовать их в общем ETL процессе.
Набор данных: ‘https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2021-financial-year-provisional/Download-data/annual-enterprise-survey-2021-financial-year-provisional-csv.csv

Алгоритм решения пошагово идентичен примеру 1, однако на уровне кода обработка полученных данных имеет ряд особенностей, что связано с форматом загружаемых данных:

  1. Проанализируем исходный набор данных и определим заголовки столбцов:
 ['Year', 'Industry_aggregation_NZSIOC', 'Industry_code_NZSIOC', 
  'Industry_name_NZSIOC',  'Units', 'Variable_code', 'Variable_name', 
  'Variable_category', 'Value',  'Industry_code_ANZSIC06']
  1. В системе создадим источник данных на основе таблицы Excel, содержащий указанные заголовки столбцов. Таблица пока не заполнена данными.

  2. Создадим модель данных на основе источника:

  3. Импортируем необходимые библиотеки:

import requests
import csv
  1. Добавим функцию, по итогам выполнения которой, в таблицу источника будут записаны данные:
def after_load_csvdata(df, spark, app, *args, **kwargs):
   
    r = requests.get('https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2021-financial-year-provisional/Download-data/annual-enterprise-survey-2021-financial-year-provisional-csv.csv')
    
    #Преобразуем ответ от сервиса в список списков (матрицу), который потребуется для создания DataFrame
    lines = r.text.splitlines()
    csv_data = csv.reader(lines)
    
    data = []
    for row in csv_data:
        data.append(row)

    #Удалим самый первый список с заголовками из набора списков, 
    чтобы не дублировать их в итоговом DataFrame
    del data[0]
    
    columns = ['Year', 'Industry_aggregation_NZSIOC', 'Industry_code_NZSIOC',          
               'Industry_name_NZSIOC', 'Units', 'Variable_code', 'Variable_name',            
               'Variable_category', 'Value', 'Industry_code_ANZSIC06']
    
    return spark.createDataFrame(data,columns)

В итоге, данные успешно записаны в DataFrame:

3. Создание записи в БД удаленного web-сервиса

Задача: необходимо передать данные для записи (‘name’, ‘email’, ‘password’) стороннему web-сервису Users (bugred.ru), чтобы создать новых пользователей.

Алгоритм решения:

  1. В системе модель данных содержит таблицу пользователей со столбцами ‘name’, ‘email’:

  1. После загрузки данных в модель нам необходимо в редакторе кода написать POST запрос. Импортируем необходимые библиотеки:
import requests
  1. Далее напишем функцию, которая, используя данные DataFrame в качестве параметров, направляет POST запрос на создание пользователя:
def after_load_users(df, spark, app, *args, **kwargs):
    
    #Помещаем каждый столбец в список с помощью lambda-функции
    user_name_list = df.rdd.map(lambda x: x[1]).collect()
    user_email_list = df.rdd.map(lambda x: x[2]).collect()

    #В качестве параметров указываем первый элемент из созданных списков
    и необходимый пароль
    param = {'email': user_email_list[1], 'name': user_name_list[1], 'password': '123' }

    #Создаем POST запрос, в котором указываем url и параметры запроса
    resp = requests.post('http://users.bugred.ru/tasks/rest/doregister', data=param)  
    print(resp.text)

    return df

Список имен

Список email

В итоге, на удаленном ресурсе с помощью web-сервиса создан пользователь с указанными параметрами:

В нашем примере тестовый сервис не позволяет направлять запросы из цикла.
В случае работы с другими web-сервисами создание записей происходит в цикле, например:

for i in range(len(user_name_list)):
    param = {'name': user_name_list[i]}
    resp = requests.post('url', data=param)

4. Отправка данных в чат Telegram

Задача: направить пользователю сообщение через TelegramBot после обработки данных.

Алгоритм решения:

  1. В системе AW создана модель данных, произведена запись данных в DataFrame (Ранее рассмотренный пример: 2. Получение данных в формате CSV и запись их в DataFrame).

  2. Перейдем в редактор, импорт дополнительных библиотек, кроме “Requests”, не требуется. Нам необходимо создать POST запрос и указать его параметры, используя token бота, метод (отправка текстового сообщения), содержание сообщения, chat_id (ссылка на общий или индивидуальный чат бота с пользователем). Поместим код в функцию, выполняемую в точке расширения ‘after_all()’:

def after_all(df, spark, app, *args, **kwargs):
        
    #Создаем POST запрос с параметрами 'method', 'chat_id', 'text' для отправки с помощью TelegramBot   
    token='****************************************************************'
    url = 'https://api.telegram.org/bot'
    url += token
    method = url + '/sendMessage'
    r = requests.post(method, data={'chat_id': **********, 'text': 'Данные из стороннего сервиса загружены'})

    return df

После загрузки данных в Dataframe Telegram-бот направил пользователю сообщение:

4 лайка