Создание корпоративного хранилища данных на платформе Advanced с помощью ClickHouse в составе кластера MapReduce

В больших компаниях при формировании отчетности или для обеспечения работы аналитиков принято строить корпоративные хранилища данных. Они способны обрабатывать большие объемы информации и делать это быстрее классических СУБД.

Для хранения такой информации принято использовать базы данных типа Massive Parallel Processing. Они позволяют быстрее выполнять аналитические запросы за счет:

  • возможности разделить информацию из таблиц для хранения на нескольких серверах;

  • возможности проанализировать входящий SQL-запрос и запустить его параллельно на нескольких серверах для обработки части информации;

  • возможности собрать результаты выполнения параллельных запросов и вернуть клиенту результирующий набор данных.

Такой MPP-базой данных является ClickHouse, который в облаке Advanced доступен в качестве компонента сервиса MapReduce (MRS). ClickHouse зарекомендовал себя как очень быстрая СУБД для работы с очень большими объемами данных.

Сценарий

В рамках практической работы нужно будет развернуть кластер сервиса MapReduce с ClickHouse, а так же провести несколько экспериментов по загрузке данных и выполнению запросов к ним. Запросы будут выполняться к базе данных c информацией о фильмах и их оценках от пользователей.

База данных состоит из таблиц:

movies

movieId

title

genres

1

Дьявол носит Prada (2006)

Комедия, драма

2

Однажды в… Голливуде (2019)

Комедия, драма

3

Шрек (2001)

Комедия, фэнтези

ratings

userId

movieId

rating

timestamp

1

2

4.0

2018-03-27 00:51:45

2

4

3.0

2018-03-26 23:37:22

Задача состоит в том, чтобы получить список всех фильмов с их средней оценкой.

Перед началом работы

ClickHouse представляет собой колоночную СУБД, которая может хранить данные на множестве независимых серверов. При планировании хранилища данных нужно учесть некоторые возможности и особенности ClickHouse.

Движок таблицы

Движок таблицы определяет:

  • как и где хранятся данные;

  • куда данные записывать и откуда читать;

  • какие запросы поддерживаются;

  • возможен ли параллельный доступ к данным;

  • возможно ли использование индексов;

  • возможно ли выполнение многопоточного запроса;

  • параметры репликации данных.

Первичный ключ

Первичный ключ в ClickHouse не является уникальным для каждой записи. Данные при записи на диск сортируются в соответствии с выбранным первичным ключом. При достижении 8 192 строк или 10 МБ данных создается запись (засечка) в файле индекса первичного ключа. Каждая такая партия — это гранулярность индекса. Концепция гранулярности создает разреженный индекс, который легко помещается в оперативную память. Гранулы индекса представляют собой наименьшую пачку данных, обрабатываемых во время SELECT-запросов.

Подготовка инструментов

Чтобы упростить выполнение практической работы, подключаться к веб-интерфейсу ClickHouse можно с помощью через балансировщик нагрузки с публичным IP-адресом. Чтобы обезопасить инсталляцию, подключение будет разрешено только с определенного IP-адреса. Узнайте свой внешний IP-адрес, например, с помощью сервиса MyIP.ru.

Подготовка данных

В практической работе будут использоваться данные из публичного датасета. Его можно скачать по ссылке.

Для работы понадобятся два файла:

  • movies.csv

  • ratings.csv

Подготовка инфраструктуры

Для выполнения практической работы понадобятся:

  • Object Storage Service (OBS), в который будут загружены файлы с данными;

  • кластер MapReduce Service (MRS) c развернутым ClickHouse, в котором будут храниться и обрабатываться данные.

Создание бакета OBS

Исходные данные нужно импортировать в ClickHouse. Для этого их сначала нужно поместить в хранилище, доступное используемым в практической работе сервисам. В Advanced есть сервис хранения и управления объектным хранилищем данных — Object Storage Service (OBS).

Чтобы создать OBS-бакет:

  1. Войдите в консоль управления Advanced:

  2. Чтобы открыть список сервисов, нажмите Service List.

  3. Выберите Management & Deployment → Application Operation Management.

  4. В правом верхнем углу нажмите Create Bucket.

  5. Data Redundancy Policy — для практической работы не требуется мультизональное хранилище (Multi-AZ storage), поэтому выберите Single-AZ storage.

  6. Bucket Name — задайте название бакета.

  7. Storage Class — выберите класс хранения Standard.

  8. Bucket Policy — выберите Private.

    Примечание

    • Private — только владелец может смотреть, изменять и удалять объекты бакета.

    • Public Read — любой пользователь может просматривать содержимое бакета. Только владелец имеет право изменять и удалять объекты бакета.

    • Public Read and Write — любой пользователь может смотреть, изменять и удалять объекты бакета.

  9. Default Encryption — выберите Disable. Для выполнения практической работы шифрование объектов в бакете не требуется.

  10. Enterprise Project — выберите Enterprise-проект, в котором будет создан бакет.

  11. Нажмите Create Now.

После создания бакет появится в общем списке.

Загрузка исходных данных в бакет

Чтобы загрузить CSV-файлы с исходными данными в бакет:

  1. Нажмите на название созданного бакета.

  2. В левом меню перейдите в раздел Objects.

  3. Нажмите Upload Object.

  4. Перетащите файлы movies.csv и ratings.csv в окно загрузки или нажмите add file и выберите их в проводнике.

  5. Нажмите Upload.

Дождитесь окончания загрузки файлов. Когда файлы будут загружены, они появятся в списке.

Создание виртуальной сети

Для работы MRS нужно создать виртуальную сеть, в которую будут добавлены серверы кластера. Чтобы создать сеть:

  1. В левом верхнем углу консоли управления нажмите Homepage.

  2. В списке всех сервисов выберите Virtual Private Cloud.

  3. В правом верхнем углу нажмите Create VPC.

  4. В поле Name задайте название виртуальной сети.

  5. IPv4 CIDR Block — адресное пространство сети. Укажите 192.168.0.0/24.

  6. Enterprise Project — выберите тот же Enterprise-проект, в котором находится ранее созданный бакет.

  7. В поле Name в разделе Default Subnet задайте название подсети.

  8. Нажмите Create Now.

Виртуальная сеть создана.

Создание кластера MapReduce

MapReduce Service — надежная, безопасная и простая в использовании платформа корпоративного уровня для хранения, обработки и анализа больших данных. MRS основан на:

  • программном обеспечении с открытым исходным кодом Hadoop;

  • вычислительном механизме Spark in-memory;

  • колоночной СУБД ClickHouse;

  • распределенной базе данных HBase и хранилище Hive.

Чтобы создать кластер MapReduce:

  1. В левом верхнем углу консоли управления нажмите Homepage.

  2. Выберите сервис MapReduce Service.

  3. В правом верхнем углу нажмите Buy Cluster.

  4. В поле Cluster Name задайте название кластера.

  5. Cluster Version — выберите версию 3.1.0.

  6. Component — выберите ClickHouse Cluster.

  7. AZ — выберите зону доступности, в которой будет развернут кластер ClickHouse.

  8. VPC — выберите виртуальную сеть, которую создали ранее.

  9. Subnet — выберите подсеть, которая была создана вместе с виртуальной сетью.

  10. Enterprise Project — выберите Enterprise-проект, в котором будет создан кластер.

  11. Cluster Node — выберите конфигурацию узлов кластера. Для практической работы достаточно самых простых виртуальных машин, доступных для выбора. Чтобы сменить конфигурацию, нажмите на Редактирование настроек.

  12. Отключите параметр Kerberos Authentication. Чтобы подтвердить действие, нажмите Confirm.

  13. Задайте пароль в поле Password и введите его повторно в Confirm Password.

  14. Активируйте параметр Secure Communications.

  15. Нажмите Buy Now.

В течение нескольких минут кластер будет создан, а его статус изменится на Running. Проверить статус можно в разделе Clusters → Active Clusters.

Создание балансировщика нагрузки

На мастерах кластера MapReduce будет развернут сервис ClickHouseBalancer, который будет перенаправлять запросы с мастеров кластера на конечные серверы ClickHouse.

В рамках практической работы будет создан балансировщик нагрузки, бэкендом которого будут выступать сервисы ClickHouseBalancer на двух мастер-узлах кластера.

По умолчанию веб-интерфейс ClickHouse доступен по порту 8123, но сервис ClickHouseBalancer публикует порт 21425.

Создайте группу IP-адресов, а затем балансировщик нагрузки. В группу IP-адресов входят адреса, которые будут использовать одни и те же правила групп безопасности. Чтобы создать группу:

  1. В левом верхнем углу консоли управления нажмите Homepage.

  2. В списке сервисов выберите Elastic Load Balance.

  3. В меню слева перейдите в раздел Elastic Load Balance → IP Address Groups.

  4. В правом верхнем углу нажмите Create IP Address Group.

  5. В поле Name задайте название группы IP-адресов.

  6. В поле Enterprise Project выберите проект, к которому будет относиться группа адресов.

  7. В поле IP Address введите ваш IP-адрес, который можно узнать с помощью сервиса MyIP.ru.

  8. Нажмите OK.

Группа IP-адресов создана.

Чтобы создать балансировщик нагрузки:

  1. В левом верхнем углу консоли управления нажмите Homepage.

  2. В списке сервисов выберите Elastic Load Balance.

  3. В правом верхнем углу нажмите Create Elastic Load Balancer.

  4. Type — выберите Shared.

  5. Network Type — выберите Public network.

  6. VPC — выберите виртуальную сеть, которую создали ранее.

  7. Subnet — выберите подсеть, которую создали ранее.

  8. EIP — выберите New EIP.

  9. Bandwidth — выберите полосу пропускания 10 Мбит/с.

  10. В поле Name задайте название балансировщика нагрузки.

  11. Enterprise Project — выберите тот же Enterprise-проект, который использовали ранее.

  12. Нажмите Create Now.

  13. Нажмите Submit.

Балансировщик создан. Чтобы его настроить:

  1. Нажмите на название созданного балансировщика.

  2. Перейдите на вкладку Listeners.

  3. Нажмите Add Listeners.

  4. В поле Name задайте название слушателя.

  5. Frontend Protocol/Port — выберите HTTP и введите порт 80.

  6. Раскройте меню Advanced Settings.

  7. В списке Access Policy выберите Whitelist.

  8. В списке IP Address Group выберите созданную ранее группу IP-адресов.

  9. Нажмите Next.

  10. В поле Name задайте название группы серверов, между которыми будет распределяться трафик.

  11. Нажмите Finish.

  12. Нажмите add servers to the newly created backend server group.

  13. Чтобы добавить серверы в бэкенд-группу, нажмите Add.

  14. В списке выберите те виртуальные машины, в названии которых есть node_master.

  15. Нажмите Next.

  16. Для каждого сервера в столбце Backend Port добавьте порт 21425.

  17. Нажмите Finish.

Балансировщик нагрузки создан и настроен.

Обновление группы безопасности MapReduce

Чтобы настроенный балансировщик корректно работал с серверами MapReduce, нужно перенастроить группу безопасности, которая была создана вместе с кластером MapReduce. Для этого:

  1. В левом верхнем углу консоли управления нажмите Homepage.

  2. В списке сервисов выберите Virtual Private Cloud.

  3. В меню слева перейдите в раздел Access Control → Security Groups.

  4. Нажмите на название группы безопасности, название которой соответствует маске: mrs_{название кластера MRS}_{случайные символы}.

  5. Перейдите на вкладку Inbound Rules.

  6. Нажмите Add Rule.

  7. В поле Priority введите 1.

  8. Protocol & Port — выберите TCP и введите порт 21425.

  9. В колонке Source введите подсеть балансировщиков нагрузки — 100.125.0.0/16.

  10. Нажмите OK.

Правила группы безопасности MapReduce обновлены.

Создание Access Keys

Для подключения к OBS из ClickHouse нужно создать ключи для доступа по API. Для этого:

  1. В правом верхнем углу консоли управления Advanced нажмите на имя пользователя и выберите My Credentials.

  2. В меню слева перейдите в раздел Access Keys.

  3. Нажмите Create Access Keys.

  4. При необходимости введите описание ключа в поле Description и нажмите OK.

  5. Чтобы скачать файл с ключами, нажмите Download. Ключи можно скачать только один раз после их создания.

Инфраструктура готова к выполнению работы с данными.

Работа с данными

Работа с ClickHouse будет выполняться через веб-интерфейс, который доступен по IP-адресу балансировщика нагрузки. Чтобы узнать нужный IP-адрес:

  1. В левом верхнем углу консоли управления нажмите Homepage.

  2. В списке сервисов выберите Elastic Load Balance.

  3. Нажмите на название созданного балансировщика.

  4. В поле IP Address → IPv4 EIP скопируйте IP-адрес.

  5. Перейдите в браузере по ссылке: http://{публичный IP-адрес балансировщика}/play. На экране появится окно:

    Веб-интерфейс ClickHouse

Чтобы выполнить запрос, введите его в текстовое поле и нажмите Run.

Первый эксперимент

В рамках первого эксперимента будут созданы две таблицы — таблица с фильмами и таблица с рейтингами. Для решения задачи будет использоваться запрос с JOIN. Планирование хранения данных будет намеренно не оптимальным и в качестве первичного ключа будет задано случайное число.

В первом эксперименте будет использоваться движок таблиц «ReplicatedMergeTree». Это значит, что полная копии данных будут храниться на каждом из серверов ClickHouse, но запрос будет полностью выполняться только на одном из них.

Чтобы создать таблицы, выполните запросы по очереди:

  1. create database if not exists movies1 on cluster default_cluster
    
  2. create table movies1.movies on cluster default_cluster (
       `movieId` UInt32,
       `title` String,
       `genres` String
    )
    ENGINE = ReplicatedMergeTree
    PRIMARY KEY tuple()
    
  3. create table movies1.ratings on cluster default_cluster (
       `movieId` UInt32,
       `userId` UInt32,
       `rating` DECIMAL(3,2),
       `timestamp` DateTime
    )
    ENGINE = ReplicatedMergeTree
    PRIMARY KEY tuple()
    
  4. insert into movies1.movies
    select *
    from s3(
       'https://bucket-name.obs.ru-moscow-1.hc.sbercloud.ru/movies.csv',
       'Ключ Access Key',
       'Ключ Secret Key',
       'CSVWithNames', "
          `movieId` UInt32,
          `title` String,
          `genres` String"
       )
    SETTINGS format_csv_allow_single_quotes=false
    

    В коде этого запроса замените:

    • bucket-name на название бакета, которое вы указали при его создании;

    • Ключ Access Key на Access Key из файла credentials.csv;

    • Ключ Secret Key на Secret Key из файла credentials.csv.

  5. insert into movies1.ratings
    select movieId, userId, rating, FROM_UNIXTIME(timestamp)
    from s3(
       'https://bucket-name.obs.ru-moscow-1.hc.sbercloud.ru/ratings.csv',
       'Ключ Access Key',
       'Ключ Secret Key',
       'CSVWithNames', "
          `userId` UInt32,
          `movieId` UInt32,
          `rating` DECIMAL(3,2),
          `timestamp` UInt32"
       )
    SETTINGS format_csv_allow_single_quotes=false
    

    В коде этого запроса замените:

    • bucket-name на название бакета, которое вы указали при его создании;

    • Ключ Access Key на Access Key из файла credentials.csv;

    • Ключ Secret Key на Secret Key из файла credentials.csv.

Так как используется движок ReplicatedMergeTree и добавление данных в таблицы не планируется, выполните принудительную оптимизацию таблиц с помощью запросов:

  1. OPTIMIZE TABLE movies1.movies on cluster default_cluster FINAL
    
  2. OPTIMIZE TABLE movies1.ratings on cluster default_cluster FINAL
    

Затем выполните запрос, который выведет список фильмов и их рейтинг:

select m.title, avg(r.rating) as avg_rating from movies1.movies as m left join movies1.ratings as r on m.movieId = r.movieId group by m.title

Время выполнения запроса при разработке практической работы составило 2,1 с. При выполнении запроса ClickHouse прочитал 20 млн строк: Elapsed: 2.086 sec, read 20027541 rows..

Второй эксперимент

В этом случае будет воссоздан первый эксперимент, но, чтобы оптимизировать хранение данных, в качестве первичного ключа будет указан идентификатор фильма.

Чтобы создать таблицы, выполните запросы:

  1. create database if not exists movies2 on cluster default_cluster
    
  2. create table movies2.movies on cluster default_cluster (
       `movieId` UInt32,
       `title` String,
       `genres` String
    )
    ENGINE = ReplicatedMergeTree
    PRIMARY KEY (movieId)
    ORDER BY (movieId)
    
  3. create table movies2.ratings on cluster default_cluster (
       `movieId` UInt32,
       `userId` UInt32,
       `rating` DECIMAL(3,2),
       `timestamp` DateTime
    )
    ENGINE = ReplicatedMergeTree
    PRIMARY KEY (movieId)
    ORDER BY (movieId)
    
  4. insert into movies2.movies
    select *
    from s3(
    'https://bucket-name.obs.ru-moscow-1.hc.sbercloud.ru/movies.csv',
    'Ключ Access Key',
    'Ключ Secret Key',
    'CSVWithNames', "
       `movieId` UInt32,
       `title` String,
       `genres` String"
    )
    SETTINGS format_csv_allow_single_quotes=false
    

    В коде этого запроса замените:

    • bucket-name на название бакета, которое вы указали при его создании;

    • Ключ Access Key на Access Key из файла credentials.csv;

    • Ключ Secret Key на Secret Key из файла credentials.csv.

  5. insert into movies2.ratings
    select movieId, userId, rating, FROM_UNIXTIME(timestamp)
    from s3(
       'https://bucket-name.obs.ru-moscow-1.hc.sbercloud.ru/ratings.csv',
       'Ключ Access Key',
       'Ключ Secret Key',
       'CSVWithNames', "
          `userId` UInt32,
          `movieId` UInt32,
          `rating` DECIMAL(3,2),
          `timestamp` UInt32"
       )
    SETTINGS format_csv_allow_single_quotes=false
    

    В коде этого запроса замените:

    • bucket-name на название бакета, которое вы указали при его создании;

    • Ключ Access Key на Access Key из файла credentials.csv;

    • Ключ Secret Key на Secret Key из файла credentials.csv.

В этом случае также используется движок ReplicatedMergeTree и добавление данных в таблицы не планируется. Поэтому выполните принудительную оптимизацию таблиц с помощью запросов:

  1. OPTIMIZE TABLE movies2.movies on cluster default_cluster FINAL
    
  2. OPTIMIZE TABLE movies2.ratings on cluster default_cluster FINAL
    

Затем выполните запрос, который выведет список фильмов и их рейтинг:

select m.title, avg(r.rating) as avg_rating from movies2.movies as m left join movies2.ratings as r on m.movieId = r.movieId group by m.title

Время выполнения запроса при разработке практической работы составило 1,3 с. При выполнении запроса ClickHouse прочитал 20 млн строк: Elapsed: 1.327 sec, read 20027541 rows..

Третий эксперимент

В этом случае данные будут сохранены в денормализованном виде — одной плоской таблицей. Движок таблиц останется тем же — ReplicatedMergeTree.

Чтобы создать таблицы, выполните запросы:

  1. create database if not exists movies3 on cluster default_cluster
    
  2. create table movies3.ratings on cluster default_cluster (
       `movieId` UInt32,
       `title` String,
       `genres` String,
       `userId` UInt32,
       `rating` DECIMAL(3,2),
       `timestamp` DateTime
    )
    ENGINE = ReplicatedMergeTree
    PRIMARY KEY tuple()
    
  3. insert into movies3.ratings
    select M.movieId, M.title, M.genres, R.userId, R.rating, FROM_UNIXTIME(R.timestamp)
    from s3(
       'https://bucket-name.obs.ru-moscow-1.hc.sbercloud.ru/movies.csv',
       'Ключ Access Key',
       'Ключ Secret Key',
       'CSVWithNames', "
          `movieId` UInt32,
          `title` String,
          `genres` String"
    ) as M left join
    s3(
    'https://bucket-name.obs.ru-moscow-1.hc.sbercloud.ru/ratings.csv',
    'Ключ Access Key',
    'Ключ Secret Key',
    'CSVWithNames', "
       `userId` UInt32,
       `movieId` UInt32,
       `rating` DECIMAL(3,2),
       `timestamp` UInt32"
    ) as R on M.movieId = R.movieId
    SETTINGS format_csv_allow_single_quotes=false
    

    В коде этого запроса замените:

    • bucket-name на название бакета, которое вы указали при его создании;

    • Ключ Access Key на Access Key из файла credentials.csv;

    • Ключ Secret Key на Secret Key из файла credentials.csv.

В этом случае также используется движок ReplicatedMergeTree и добавление данных в таблицы не планируется. Поэтому выполните принудительную оптимизацию таблиц с помощью запроса:

OPTIMIZE TABLE movies3.ratings on cluster default_cluster FINAL

Затем выполните запрос, который выведет список фильмов и их рейтинг:

select title, avg(rating) as avg_rating from movies3.ratings group by title

Время выполнения запроса при разработке практической работы составило 0,092 с. При выполнении запроса ClickHouse прочитал 20 млн строк: Elapsed: 0.092. sec, read 20000797 rows..

Чтобы вывести данные только по одному фильму, выполните команду:

select title, avg(rating) as avg_rating, count(*) as number_of_ratings from movies3.ratings where movieId=123 group by title

Время выполнения запроса при разработке практической работы составило 0,004 с. При выполнении запроса ClickHouse прочитал 20 млн строк: Elapsed: 0.004. sec, read 20000797 rows..

Даже для поиска информации по одному фильму ClickHouse все равно считал все записи.

Четвертый эксперимент

Чтобы оптимизировать хранение данных и еще быстрее получать рейтинг фильмов, в качестве первичного ключа будет указан идентификатор фильма.

Выполните по очереди команды:

  1. create database if not exists movies4 on cluster default_cluster
    
  2. create table movies4.ratings on cluster default_cluster (
       `movieId` UInt32,
       `title` String,
       `genres` String,
       `userId` UInt32,
       `rating` DECIMAL(3,2),
       `timestamp` DateTime
    )
    ENGINE = ReplicatedMergeTree
    PRIMARY KEY (movieId)
    ORDER BY (movieId, timestamp)
    
  3. insert into movies4.ratings
    select M.movieId, M.title, M.genres, R.userId, R.rating, FROM_UNIXTIME(R.timestamp)
    from s3(
       'https://bucket-name.obs.ru-moscow-1.hc.sbercloud.ru/movies.csv',
       'Ключ Access Key',
       'Ключ Secret Key',
       'CSVWithNames', "
          `movieId` UInt32,
          `title` String,
          `genres` String"
       ) as M left join
       s3(
       'https://bucket-name.obs.ru-moscow-1.hc.sbercloud.ru/ratings.csv',
       'Ключ Access Key',
       'Ключ Secret Key',
       'CSVWithNames', "
          `userId` UInt32,
          `movieId` UInt32,
          `rating` DECIMAL(3,2),
          `timestamp` UInt32"
       ) as R on M.movieId = R.movieId
    SETTINGS format_csv_allow_single_quotes=false
    

    В коде этого запроса замените:

    • bucket-name на название бакета, которое вы указали при его создании;

    • Ключ Access Key на Access Key из файла credentials.csv;

    • Ключ Secret Key на Secret Key из файла credentials.csv.

В этом случае тоже используется движок ReplicatedMergeTree, а добавлять данные в таблицы не планируется. Поэтому выполните принудительную оптимизацию таблиц с помощью запроса:

OPTIMIZE TABLE movies4.ratings on cluster default_cluster FINAL

Чтобы вывести данные только по одному фильму, выполните команду:

select title, avg(rating) as avg_rating, count(*) as number_of_ratings from movies4.ratings where movieId=123 group by title

Запрос был выполнен быстрее — за 0,001 с, а количество прочитанных строк уменьшилось — 8192: Elapsed: 0.001 sec, read 8192 rows..

Запустили Evolution free tier
для Dev & Test
Получить