Организуем платформу обработки потоковых данных из Kafka, Spark и Greenplum

Привет, Хабр!

Меня зовут Иван Хозяинов, а работаю в ITSumma, где изучаю и применяю технологии, связанные с большими данными, машинным обучением и аналитикой. В этой статье хочу рассказать о системе хранения и обработки данных и инструментах, которые встречаются на пути от сырых исходников до представления, удобного для последующего анализа.

Поговорим, как связаны серверы в дата-центре и распределенные приложения для обработки данных и почему пришлось написать свой коннектор для Spark и Greenplum.

Какую задачу решаем

Обычно задача платформы обработки потоковых данных состоит в том, чтобы собрать, сохранить и провести аналитику данных, используя их в дальнейшем для бизнес-аналитики, машинного обучения и т.п. Причём:

  • Источников данных несколько (5-10), и они разнородные, например: журналы, логи, данные датчиков, телеметрия. Данные из этих источников через интернет попадают на приемник данных, через шину данных передаются в обработчик, записываются и попадают в хранилище. В этой статье уделим особое внимание как раз путям наполнения СХД.

  • Каждый источник генерирует порядка 500-1500 RPS. Чаще всего это структурированные пакеты данных в avro, json или бинарном формате. Но формат у разных датчиков отличается, соответственно, нужно уметь парсить входящие пакеты, чтобы уже потом записывать в СХД и обрабатывать.

  • Система хранения данных подключается к системам аналитики (BI) или ML-моделям для обучения на исторических данных.

Требования к железу всегда индивидуальны, но чаще всего начать можно с базовой конфигурации:

  • RAM: 32G на каждый сервер;

  • CPU: 8 ядер на каждый сервер;

  • HDD+SSD: 16 TБайт суммарно;

  • Сеть: внутренняя сеть с пропускной способностью не менее 1 Гбит/с.

А для прототип подойдёт и что-нибудь попроще: например, dedicated-серверы, где можно разворачивать виртуальные машины. Прежде чем масштабировать систему сбора, хранения и обработки данных, лучше её проверить в небольшой песочнице. Мы чаще всего работаем с виртуализацией KVM, но бывает OpenStack в связке с Ceph. А когда всё отработано — можно переходить и к работе на кластерах.

Архитектура платформы

Сейчас мы используем примерно такую схему:

На схеме можно условно выделить два основных блока: ПО, которое обрабатывает данные и складывает их в распределенную систему хранения данных, и системы обеспечения CI/CD.

Путь данных от источников до хранилища и дальнейших аналитических запросов, имеющих бизнес-ценность, включает следующие этапы:

  • Данные из источников попадают в шину данных. В качестве шины используется Apache Kafka.

  • Собранные Kafka данные из различных источников попадают в Spark и там обрабатываются.

  • Обработанные данные записываются в Greenplum.

  • После этого с данными могут работать аналитики и ML-системы.

Дальше я постараюсь раскрыть, почему выбран тот или иной продукт и какие рассматривали альтернативы. Например, для работы с данными сейчас используем Greenplum (продукт Pivotal), но ещё пробовали использовать Hadoop с HDFS и поверх этого Hive. Однако Greenplum показался быстрее, проще и у него открытый исходный код. Единственная проблема с Pivotal — как и Confluent, они не распространяют и не поддерживают свои продукты в РФ. Поэтому чтобы связать Spark с Greenplum, понадобится использовать что-нибудь опенсорсное (какие есть варианты — обсудим ниже).

Каждый узел платформы обработки данных мониторится. Для этого используем связку Prometheus + Grafana, а также свою систему мониторинга серверных параметров, по которым администраторы реагируют на инциденты раньше, чем наступили бы серьезные последствия.

За обеспечение непрерывной доставки отвечает следующий конвейер:

  • коммит попадает в GitLab;

  • Jenkins запускает сборщик продукта;

  • запуск собранного задания производится в Apache Spark;

  • статический анализатор SonarQube проверяет ошибки;

  • с помощью JMeter производится нагрузочное тестирование собранного задания.

Версии компонентов

К сожалению, нельзя просто взять последние версии всех компонентов — они могут не заработать вместе, и прощай ночные сборки. Поэтому привожу наш набор совместимости:

Apache Spark — 2.4.6

Java (OpenJDK) — 1.8.0

Apache Kafka — 2.4.1

Pivotal Greenplum — 6.9.0

PostgreSQL — 12.2

Apache JMeter — 5.2.1

Apache Ignite — 2.8.1

Jenkins — 2.235.5-lts

GitLab — 13.6.3

Sonarqube — 8.4.1

Grafana — 6.7

Prometheus — 2.17.0

Нагрузочное тестирование

Для того чтобы оценить, насколько построенная схема жизнеспособна, проводим нагрузочное тестирование, которое включает в себя следующие шаги.

  • Генерация правдоподобных данных —чтобы наполнить хранилище данными и проверить дальнейшие этапы, нужно создать профиль нагрузки, похожий на реальный. Это могут быть фейковые журналы с сервера, сгенерированные данные с датчиков и т.д.

  • Проверка на разных профилях нагрузки — чтобы узнать поведение при пиковой, повышенной и равномерной нагрузке и выявить компоненты, требующие масштабирования.

  • Анализ узких мест и их масштабирование — например, может понадобиться масштабировать БД, добавить узлы в Greenplum или ресурсы для обработчика данных.

  • Оценка пропускной способности — чтобы выяснить, как будет работать данная система целиком, насколько она пригодна к реальной нагрузке. Поэтому убеждаемся, что нет никаких аномалий: CPU не скачет, память не утекает и в целом решение готово к продакшену.

Также нагрузочное тестирование может быть частью CI/CD-процесса, тогда мы будем уверены в работоспособности системы после каждого деплоя, а не только после штучной проверки по списку.

Нагрузочное тестирование Яндекс.Танком

Первым вариантом, как проводить нагрузочное тестирование, была идея тестировать платформу обработки данных как веб-сайт — с помощью Яндекс.Танка: сделать микросервис, который принимает данные и посылает их в Kafka, и его тестировать. Для этого нужно предварительно подготовить нагрузку («патроны» для танка), чтобы трассировать данные. Это занимает некоторое время, а также оперативную память и место на диске.

В целом удобно — результаты и графики, как проходит нагрузка, можно посмотреть в облачном интерфейсе. Минус в том, что этот способ подходит только для HTTP-подобных протоколов и нагрузить можно только веб-ендпойнты, которые смотрят наружу.

Нагрузочное тестирование с Apache JMeter

Другой продукт, который можно использовать для нагрузочного тестирования, — это Apache JMeter. Он больше подходит для тестирования именно распределенных систем, потому что входит в экосистему Apache Software Foundation: для него есть плагины для прямой нагрузки на Kafka; можно нагружать не только веб-интерфейс, но и сразу шину данных; можно пробовать напрямую тестировать БД и есть поддержка сообщества.

Преимущества JMeter в итоге оказались решающими — на нём и остановились. Но перед этим свой велосипед тоже попробовали написать.

Нагрузочное тестирование самописным генератором

Если мы хотим обеспечить профилирование всего конвейера обработки данных от источников до СХД с учётом транспорта и обработки, то нужно иметь возможность трассировать и определить ёмкость каждого участка пути.

Один из подходов, который используется в Zipkin и Jaeger, состоит в генерации огромного количества пакетов данных с уникальными идентификаторами, создании с их помощью нагрузки и отслеживании всех этапов за счёт trace_id. При этом на каждом участке можно сделать обработчик и измерить, за сколько данные проходят участок пути от источника до обработчика, от обработчика до БД и т.д.

Общая схема понятна и позволяет подробно оценить возможности системы обработки данных, но чаще всего это избыточный метод, который требует дополнительных усилий и ресурсов. Обычно лучше использовать JMeter.

Брокер сообщений: Apache Kafka vs RabbitMQ

Выбрали Apache Kafka благодаря широким возможностям именно в контексте работы с большими данными и нагрузками, такими как хранение журнала, масштабируемость и достоверность. Более подробно о сравнении RabbitMQ и Apache Kafka можно прочитать в отдельной статье.

Apache Spark vs Flink

Для того чтобы выбрать один из этих фреймворков, решили попробовать оба. Подробное сравнение проводили 3 года назад, и тогда бросилось в глаза, что документации, примеров больше у Apache Spark, да и в целом распространенность у него больше. Но проведенное нагрузочное тестирование простых примеров, написанных на обоих фреймворках, показало лучшие результаты у Apache Flink. Например, Игорь Кураленок из “Яндекс.Облако” на недавнем митапе упоминал, что для потоковых данных рекомендуется использовать Apache Flink. Поэтому, судя по всему, этот фреймворк тоже зрелый и его можно использовать в разработке.

Но мы сделали выбор в пользу Apache Spark.

Требования к СХД

Три самых важных параметра, которые нужно оценить и учесть, составляя требования к СХД:

  • время восстановления после сбоя (RTO);

  • пропускная способность на запись (для потоковых данных из разнородных источников);

  • пропускная способность на чтение (для аналитики и машинного обучения).

Сложность работы СХД в случае обработки больших объемов потоковых данных в том, что нужно одновременно и быстро писать, и читать. Аналитики обычно хотят работать со свежими данными, желательно, с теми, которые только что пришли, — чтобы сразу что-то смотреть и делать свои аналитические выводы. Нагрузка на СХД при этом возрастает, и нужно следить, к каким репликам должны подключаться аналитики, и вообще, должны ли они подключаться к репликам или лучше работать напрямую.

Единственное требование с точки зрения железа — держать серверы в одном дата-центре.

К сожалению, точные параметры назвать трудно, они зависят от каждого конкретного случая и отличаются для разных объемов данных и выполняемых задач. Иногда может быть достаточно 4-х выделенных серверов, иногда нужно 12. В каких-то случаях нужны большие диски, а в каких-то лучше шардирование и диски по 1-2 ТБайт. Бывает, что общий объем хранилища измеряется петабайтами, но для нас более привычны объемы в несколько десятков терабайт.

Холодные хранилища данных

Пробовали работать с разными холодными хранилищами, в том числе, с Apache Hadoop и Cassandra.

Наш опыт работы с Cassandra небольшой, использовать её напрямую в системе аналитики сложнее. Это более медленная система и лучше подходит для долговременных хранилищ. Мы же говорим о быстрой обработке больших объемов данных — как структурированных, так и неструктурированных. Они попадают в озеро данных на Hadoop и хранятся там в чистом виде, пока не понадобятся, а потом уже передаются в тот же Greenplum и там обрабатываются.

В работе с Apache Hadoop использовали такую систему:

  • Apache Ambari для развертывания и управления. Иногда разворачиваем Ansible-скриптами — так больше степеней свободы, но зачастую через интерфейс управлять хранилищем проще, а у Ambari он удобный и хорошо всё разворачивается.

  • Дублирование namenode (активное и пассивное) — чтобы обеспечить high availability.

  • Репликация datanode с фактором репликации по умолчанию 3, но если ресурсов не хватает, то реплика-фактор может быть и 2.

  • Поверх HDFS Hive для аналитики и SQL-запросов и HBase как альтернатива Cassandra.

Pivotal Greenplum

Как уже писал ранее, в контексте работы с аналитикой остановились на Greenplum. High availability обеспечиваем за счет:

  • дублирования мастера — аналог namenode в Hadoop;

  • зеркалирования сегментов — аналог datanode в Hadoop, который представляет собой просто сервер в PostgreSQL;

  • RAID на железном уровне — это единственная рекомендация по железу, во всём остальном говорим про софт.

Spark+Greenplum = ?

Один из ключевых моментов статьи: каким образом можно обработать потоковые данные и записать их в систему хранения данных, которая сделана на Greenplum? Как уже говорил, несмотря на то, что Greenplum — продукт с открытым исходным кодом, вся обвязка и поддержка требует лицензии и контакта с западным рынком. Сейчас в России продукт не поддерживается.

Чтобы связать Spark и Greenplum, есть несколько вариантов:

  • Можно скачать коннектор Spark- Greenplum. Но в нём нет исходного кода (просто документация) и периодически нет доступа. На письма тоже отвечают не всегда, и непонятно, с чем это связано.

  • Есть opensource-библиотека, которая быстрее JDBC в несколько раз — в readme написано, что в 100 раз, если записывать напрямую. Но у неё нет интерфейса, который поддерживался бы самим Spark.

  • Мы сделали и совсем недавно выложили в opensource на GitHub свой коннектор Spark и Greenplum.

Наш Spark Greenplum connector распространяется под лицензией MIT, поддерживает DataSource API v2 Spark (только для Spark v2). Сейчас он ещё в альфа-версии и работает только на чтение, в дальнейшем доработаем и на запись, но пока, как и в предыдущем варианте, можно записывать при помощи JDBC-запросов через копирование. Над кодом работали несколько человек, но последнюю сборку и доработку интерфейса осуществлял @hovercraft-habr, и он сможет подробнее ответить на вопросы про технические тонкости.

Если статья вас заинтересовала, то, возможно, вы захотите попробовать этот коннектор. Мы с коллегами с удовольствием ответим на вопросы и учтём комментарии!

Let’s block ads! (Why?)

Read More

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *