Использование Apache Spark в облаке Advanced

Когда IT-сфера столкнулась с необходимостью анализировать данные в таких объемах, с которыми никто никогда не работал, был создан ряд инструментов для работы с большими данными. Они впоследствии объединились в экосистему Hadoop.

Один из таких популярных инструментов — Apache Spark. Это платформа с открытым исходным кодом для реализации распределенной обработки больших данных. В отличие от классического обработчика из ядра Hadoop, реализующего двухуровневую концепцию MapReduce с хранением промежуточных данных на накопителях, Spark обрабатывает данные в оперативной памяти. Это значительно повышает скорость работы для некоторых классов задач.

В этой статье будут приведены различные решения по анализу рейтингов фильмов с помощью Apache Spark, который доступен в сервисах MapReduce Service (MRS) и Data Lake Insight (DLI) в облаке Advanced.

Постановка задачи

Необходимо получить cписок фильмов с рейтингом, например выше 4.0.

Ниже приведены исходные данные:

  • о фильмах

    movieId

    title

    genres

    1

    Дьявол носит Prada (2006)

    Комедия, драма

    2

    Однажды в… Голливуде (2019)

    Комедия, драма

    3

    Шрек (2001)

    Комедия, фэнтези

    4

    Бойцовский клуб (1999)

    Триллер, драма

  • о рейтингах, которые пользователи поставили каждому фильму

    userId

    movieId

    rating

    timestamp

    5

    1

    3.7

    123456789

    6

    2

    3.6

    123456789

    7

    3

    4.1

    123456789

    8

    4

    4.7

    123456789

Получен список фильмов с рейтингом выше 4.0:

title

average_rating

Шрек (2001)

4.1

Бойцовский клуб (1999)

4.7

Примечание

Примеры файлов с данными можно скачать в CSV-формате по ссылкам ниже:

Способы решения задачи

Apache Spark в облаке Advanced доступен в сервисах MapReduce Service и Data Lake Insight в двух видах:

  • Spark SQL

  • Spark Submit

Задача будет решаться с помощью:

  • Spark SQL без написания кода программы. Этот способ может подойти пользователям, которые не уверены в своих навыках написания кода, но знают и понимают язык SQL.

  • Spark Submit с отдельно написанными короткими программами на Python. Этот способ будет ближе разработчикам.

В рамках этой лабораторной работы будут задействованы оба сервиса и два способа.

Перед началом работы

Создайте:

  1. Кластер в MapReduce Service.

  2. Очередь в сервисе Data Lake Insight.

  3. Бакет в объектном хранилище Object Storage Service для хранения исходных данных и сохранения результатов рассчетов.

Создание кластера в MapReduce Service

MapReduce Service (MRS) — сервис для хранения, анализа и обработки больших данных. MRS предоставляет кластеры больших данных корпоративного уровня в облаке Advanced, в которых можно запускать компоненты больших данных, такие как Hadoop, Spark, HBase, Kafka и Storm. MRS полностью совместим со всеми open-source компонентами экосистемы.

Чтобы создать кластер в MapReduce Service:

  1. Войдите в консоль управления Advanced:

  2. Чтобы открыть список сервисов, нажмите Service List.

  3. Выберите EI Enterprise Intelligence → MapReduce Service.

  4. В правом верхнем углу нажмите Buy Cluster.

  5. На вкладке Quick Config:

    • Cluster Name — укажите название кластера.

    • Cluster Version — выберите версию MRS 3.1.0.

    • Component — выберите компонент Hadoop Analysis Cluster.

      ../../_images/s__mrs-1.png
    • AZ — выберите зону доступности.

    • VPC — выберите виртуальную сеть или создайте новую.

    • Subnet — выберите подсеть или создайте новую.

    • Enterprise Project — выберите необходимый проект или default.

      ../../_images/s__mrs-2.png
    • Cluster Node — нажав Редактирование настроек, можно выбрать самые простые виртуальные машины. Для этой лабораторной работы много ресурсов не потребуется.

      ../../_images/s__mrs-3.png
    • Password — задайте пароль для пользователя.

    • Confirm Password — подтвердите пароль.

      ../../_images/s__mrs-4.png
    • Secure Communications — включите опцию.

  6. Чтобы завершить создание, нажмите Buy Now.

Создание кластера занимает некоторое время. Успешная операция подтверждается статусом «Running».

Пока кластер создается, можно перейти к следующим подготовительным работам.

Создание очереди в Data Lake Insight

Data Lake Insight (DLI) — бессерверная технология для работы с большими данными в облаке.

В этой лабораторной работе необходимо создать очереди для Spark SQL и Spark Submit. Чтобы создать очередь:

  1. Войдите в консоль управления Advanced:

  2. Чтобы открыть список сервисов, нажмите Service List.

  3. Выберите EI Enterprise Intelligence → Data Lake Insight.

  4. В меню слева нажмите Queue Management.

  5. В правом верхнем углу нажмите Buy Queue.

  6. Задайте параметры:

    • Name — название очереди.

    • Queue Usage — выберите:

      • For General Purpose — для Spark Submit.

      • For SQL — для Spark SQL.

    • CU Specifications16 CUs.

    • Enterprise Project — выберите необходимый проект или default.

  7. Чтобы завершить создание, нажмите Buy.

  8. Нажмите ОК → Submit.

После создания очередь появится в списке на вкладке Queue Management.

Создание бакета OBS

Для того, чтобы загрузить файлы с исходными данными и сохранить результаты рассчетов, нужно хранилище в облаке, доступное для используемых сервисов. В облаке Advanced есть сервис хранения и управления объектным хранилищем данных Object Storage Service (OBS).

Чтобы создать бакет в OBS:

  1. Войдите в консоль управления Advanced:

  2. Чтобы открыть список сервисов, нажмите Service List.

  3. Выберите Storage → Object Storage Service.

  4. В правом верхнем углу нажмите Create Bucket.

  5. Задайте параметры:

    • Data Redundancy Policy — для этой лабораторной работы выберите Single-AZ storage.

    • Bucket Name — название бакета.

    • Enterprise Project — выберите необходимый проект или default.

      ../../_images/s__mrs-6.png
  6. Чтобы завершить создание бакета, нажмите Create Now.

    После создания бакет появится в списке.

Чтобы загрузить в бакет исходные данные:

  1. Нажмите на созданный бакет и в левом меню выберите Objects.

  2. Нажмите Upload Object.

    ../../_images/s__mrs-7.png
  3. Чтобы загрузить два CSV-файла с данными, в диалоговом окне нажмите Add File.

    ../../_images/s__mrs-8.png
  4. Нажмите Upload и дождитесь окончания загрузки файлов.

Теперь сервисы подготовлены и можно переходить к решению задачи.

Решение задачи в MRS с помощью Spark Submit

Для решения задачи можно написать код на Python, который прочитает данные по указанному пути, соберет данные и запишет в выходной каталог.

  1. Создайте файл spark-mrs-1.py и добавьте в него следующий код:

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as f
    import sys
    
    movies_path = sys.argv[1]
    ratings_path = sys.argv[2]
    save_path = sys.argv[3]
    
    spark = SparkSession.builder.appName('Movie_Rating_Above_4').getOrCreate()
    
    df_movies = spark.read.csv(movies_path, header=True, inferSchema=True)
    df_ratings = spark.read.csv(ratings_path, header=True, inferSchema=True)
    
    df_movie_rating = df_movies.join(df_ratings, "movieId", "left")
    
    df_calculated = df_movie_rating.groupBy("title").agg(f.avg("rating").alias("avg_rating"))
    
    df_calculated_above_four = df_calculated.filter("avg_rating > 4").select(["title","avg_rating"])
    
    df_calculated_above_four.write.csv(save_path, header=True)
    
    Этот код:
    • Принимает несколько аргументов:

      • путь к файлу с фильмами;

      • путь к файлу с рейтингами;

      • путь до папки, в которой необходимо сохранить результат.

    • Считывает данные из файлов с фильмами и рейтингами.

    • Объединяет данные в один массив по идентификатору фильма.

    • Агрегирует данные по фильмам и считает средний рейтинг (рейтинг добавляется как столбец avg_rating).

    • Фильтрует полученный массив данных.

    • Сохраняет данные в формате csv в папку.

Для запуска задания в сервисе MRS необходимо также загрузить файл spark-mrs-1.py в бакет OBS, в который были загружены файлы с данными. Чтобы загрузить:

  1. Войдите в консоль управления Advanced:

  2. Чтобы открыть список сервисов, нажмите Service List.

  3. Выберите Storage → Object Storage Service.

  4. Нажмите на созданный бакет и в левом меню выберите Objects.

  5. Нажмите Upload Object.

  6. Чтобы загрузить spark-mrs-1.py с кодом, в диалоговом окне нажмите Add File.

  7. Нажмите Upload и дождитесь окончания загрузки файла.

После того, как файл был загружен, запустите задание в сервисе MRS. Для этого:

  1. Войдите в консоль управления Advanced:

  2. Чтобы открыть список сервисов, нажмите Service List.

  3. Выберите EI Enterprise Intelligence → MapReduce Service.

  4. В списке нажмите на название ранее созданного кластера.

  5. Перейдите на вкладку Jobs и нажмите Create.

  6. Задайте параметры:

    • TypeSparkSubmit.

    • Name — название задания.

    • Program Path — нажав кнопку OBS, выберите бакет, а затем загруженный ранее в него файл spark-mrs-1.py. Активируйте опцию «I confirm that the selected script is secure, and I understand the potential risks and accept the possible exceptions or impacts on the cluster» и нажмите ОК.

    • Parameters — через пробел укажите путь до файлов movies.csv, ratings.csv, а также путь до папки, куда необходимо сохранить результат (папки не должно существовать).

      Пример

      obs://bucket-test/movies.csv obs://bucket-test/ratings.csv obs://bucket-test/output

      ../../_images/s__mrs-9.png
  7. Нажмите ОК.

Задание отображается в списке со статусом «Submitted». Подождите несколько минут и обновите страницу. Статус задания изменится на «Completed», а результат — на «Successful».

Чтобы проверить результат выполнения:

  1. Откройте бакет OBS, путь до которого был указан в процессе создания задания.

    В бакете появилась папка для сохранения результатов.

  2. Нажмите на название папки.

    В ней содержится файл _SUCCESS, а также набор CSV-файлов с результатами.

  3. Скачайте любой из файлов и проверьте, что результат соответствует условиям задачи.

Решение задачи в MRS с помощью Spark SQL

Теперь попробуйте решить ту же самую задачу с использованием языка SQL и без написания кода на Python. Необходим следующий SQL-запрос (измените название бакета на ваше):

CREATE TEMPORARY VIEW movies (
    movieId int,
    title string,
    genres string
)
USING csv
OPTIONS (
  path 'obs://bucket-test/movies.csv',
  header true
);

CREATE TEMPORARY VIEW ratings (
    userId int,
    movieId int,
    rating float,
    timestamp string
)
USING csv
OPTIONS (
  path 'obs://bucket-test/ratings.csv',
  header true
);

INSERT OVERWRITE DIRECTORY 'obs://bucket-test/output' USING CSV
SELECT movies.title, AVG(ratings.rating) as avg_rating
FROM movies LEFT JOIN ratings ON movies.movieId = ratings.movieId
GROUP BY movies.title
HAVING avg_rating > 4

Этот скрипт выполняет то же самое, что и скрипт Python в предыдущем задании:

  • Подключается к файлам в OBS.

  • Агрегирует данные.

  • Сохраняет результат в каталог на OBS.

Чтобы запустить задание:

  1. Войдите в консоль управления Advanced:

  2. Чтобы открыть список сервисов, нажмите Service List.

  3. Выберите EI Enterprise Intelligence → MapReduce Service.

  4. В списке нажмите на название ранее созданного кластера.

  5. Перейдите на вкладку Jobs и нажмите Create.

  6. Задайте параметры:

    • TypeSparkSql.

    • Name — название задания.

    • SQL TypeScript. Введите скрипт SQL, указанный ранее.

      ../../_images/s__mrs-11.png
  7. Нажмите ОК.

Задание отображается в списке со статусом «Running». Подождите несколько минут и обновите страницу. Статус задания изменится на «Completed», а результат на «Successful».

Чтобы проверить результат выполнения:

  1. Откройте бакет OBS, путь до которого был указан в процессе создания задания.

    В бакете появилась папка для сохранения результатов.

  2. Нажмите на название папки.

    В ней содержится файл _SUCCESS, а также набор CSV-файлов с результатами.

  3. Скачайте любой из файлов и проверьте, что результат соответствует условиям задачи.

Решение задачи в DLI с помощью Spark Submit

Теперь можно обработать те же самые данные с помощью сервиса DLI. То есть вы не создаете кластер, не просматриваете виртуальные машины и не отслеживаете их состояние. Единственное, что доступно для настройки — параметры очереди, в которой будет выполняться задание.

Запустите ваш скрипт на Python в DLI (Для получения и сохранения данных также будет использоваться OBS):

  1. Войдите в консоль управления Advanced:

  2. Чтобы открыть список сервисов, нажмите Service List.

  3. Выберите EI Enterprise Intelligence → Data Lake Insight.

  4. Выберите Job Management → Spark Jobs.

  5. В правом верхнему углу нажмите Create Job.

  6. На вкладке Fill Form задайте параметры:

    • Queue — очередь, которая была создана в начале работы для типа задания Spark Submit.

    • Job Name — название задания.

    • Application — с помощью кнопки справа выберите созданный бакет OBS, а затем файл spark-mrs-1.py.

    • Application Parameters — на отдельных строках укажите путь до файла movies.csv, ratings.csv, а также путь до папки, куда необходимо сохранить результат (папки не должно существовать).

      Пример

      obs://bucket-test/movies.csv
      obs://bucket-test/ratings.csv
      obs://bucket-test/output
      
      ../../_images/s__mrs-13.png
  7. В правом верхнем углу нажмите Execute.

На странице Spark Jobs в списке появится созданное задание со статусом «Starting». Подождите несколько минут и обновите страницу. Статус запроса изменится на «Finished».

Чтобы проверить результат выполнения:

  1. Откройте бакет OBS, путь до которого был указан в процессе создания задания.

    В бакете появилась папка для сохранения результатов.

  2. Нажмите на название папки.

    В ней содержится файл _SUCCESS, а также набор CSV-файлов с результатами.

  3. Скачайте любой из файлов и проверьте, что результат соответствует условиям задачи.

Решение задачи в DLI с помощью Spark SQL

Теперь выполните анализ данных с помощью Spark SQL, который представлен в виде пользовательского интерфейса в DLI. Нужно будет создать мета-базу данных и мета-таблицы для работы. Они не будут содержать в себе данные, но будут ссылаться на данные в наших CSV-файлах, хранящихся в OBS. После этого вы сможете написать SQL-запрос для анализа данных из этих мета-таблиц.

  1. Войдите в консоль управления Advanced:

  2. Чтобы открыть список сервисов, нажмите Service List.

  3. Выберите EI Enterprise Intelligence → Data Lake Insight.

  4. В меню слева выберите SQL Editor.

  5. Нажмите Добавление параметра на вкладке Databases.

    ../../_images/s__mrs-14.png
  6. В диалоговом окне задайте параметры:

    • Database Name — название базы данных.

    • Enterprise Project — выберите необходимый проект или default.

      ../../_images/s__mrs-15.png
  7. Чтобы завершить создание базы данных, нажмите ОК.

  8. Нажмите на созданную базу данных. Откроется поле Table.

    ../../_images/s__mrs-16.png
  9. Нажмите на Добавление параметра и задайте параметры таблицы movies:

    • Name — название таблицы.

    • Data LocationOBS.

    • Data FormatCSV.

    • Bucket Location — с помощью кнопки справа выберите созданный бакет OBS, а затем файл movies.csv.

  10. Опишите столбцы по примеру ниже. Добавлять столбцы можно кнопкой Добавление параметра в правой части таблицы.

    Column Name (Название столбца)

    Type (Тип данных)

    movieId

    int

    title

    string

    genres

    string

    ../../_images/s__mrs-17.png
  11. Включите опцию Advanced settings и активируйте Table Header.

  12. Нажмите ОК.

  13. По аналогии создайте таблицу ratings со столбцами:

    Column Name (Название столбца)

    Type (Тип данных)

    userId

    int

    movieId

    int

    rating

    double

    timestamp

    string

    ../../_images/s__mrs-18.png
  14. Нажмите ОК.

  15. После того, как таблицы описаны, в правой части можно выполнить SQL запрос. Для этого вставьте запрос ниже в поле:

    INSERT OVERWRITE DIRECTORY 'obs://bucket-test/output'
    USING csv
    SELECT movies.title, AVG(ratings.rating) as avg_rating
    FROM movies LEFT JOIN ratings ON movies.movieId = ratings.movieId
    GROUP BY movies.title
    HAVING avg_rating > 4
    
  16. В верхней части поля нажмите Execute.

Статус выполнения запроса отобразится в нижней части экрана на вкладке Executed Queries.

Чтобы просмотреть статус выполнения задачи, в меню слева выберите Job Management → SQL Jobs. Успешная операция подтверждается статусом «Finished».

(Опционально) Сборка файлов в один с помощью FunctionGraph

Для сборки этих файлов в один можно использовать функцию FunctionGraph, которая будет запускаться при создании файла в заданном ранее каталоге объектного хранилища с именем _SUCCESS.

Код функции:

# -*- coding: utf-8 -*-
from com.obs.client.obs_client import ObsClient
import io
import datetime

def handler(event, context):
  date = datetime.datetime.now()
  AK = context.getAccessKey()
  SK = context.getSecretKey()
  server = 'obs.ru-moscow-1.hc.sbercloud.ru'
  bucketName = 'bucket-test'

  obsClient = ObsClient(access_key_id=AK, secret_access_key=SK, server=server)

  csv_mem = io.StringIO()

  list_objects = obsClient.listObjects(bucketName, prefix='output')
  for content in list_objects.body.contents:
    if (content.key.find('.csv') != -1):
      objectKey = str(content.key)

      resp = obsClient.getObject(bucketName, objectKey, loadStreamInMemory=True)
      if resp.body != None:
        download_object = resp.body.buffer
        csvdata = download_object.decode("utf-8")
        csv_mem.write(csvdata)

  objectKey = f"output_{date}.csv"
  obsClient.putContent(bucketName, objectKey, csv_mem.getvalue())

  return "OK"
Запустили Evolution free tier
для Dev & Test
Получить