Организуем платформу обработки потоковых данных из Kafka, Spark и Greenplum
Привет, Хабр!
Меня зовут Иван Хозяинов, а работаю в ITSumma, где изучаю и применяю технологии, связанные с большими данными, машинным обучением и аналитикой. В этой статье хочу рассказать о системе хранения и обработки данных и инструментах, которые встречаются на пути от сырых исходников до представления, удобного для последующего анализа.
Поговорим, как связаны серверы в дата-центре и распределенные приложения для обработки данных и почему пришлось написать свой коннектор для Spark и Greenplum.
Какую задачу решаем
Обычно задача платформы обработки потоковых данных состоит в том, чтобы собрать, сохранить и провести аналитику данных, используя их в дальнейшем для бизнес-аналитики, машинного обучения и т.п. Причём:
-
Источников данных несколько (5-10), и они разнородные, например: журналы, логи, данные датчиков, телеметрия. Данные из этих источников через интернет попадают на приемник данных, через шину данных передаются в обработчик, записываются и попадают в хранилище. В этой статье уделим особое внимание как раз путям наполнения СХД.
-
Каждый источник генерирует порядка 500-1500 RPS. Чаще всего это структурированные пакеты данных в avro, json или бинарном формате. Но формат у разных датчиков отличается, соответственно, нужно уметь парсить входящие пакеты, чтобы уже потом записывать в СХД и обрабатывать.
-
Система хранения данных подключается к системам аналитики (BI) или ML-моделям для обучения на исторических данных.
Требования к железу всегда индивидуальны, но чаще всего начать можно с базовой конфигурации:
-
RAM: 32G на каждый сервер;
-
CPU: 8 ядер на каждый сервер;
-
HDD+SSD: 16 TБайт суммарно;
-
Сеть: внутренняя сеть с пропускной способностью не менее 1 Гбит/с.
А для прототип подойдёт и что-нибудь попроще: например, dedicated-серверы, где можно разворачивать виртуальные машины. Прежде чем масштабировать систему сбора, хранения и обработки данных, лучше её проверить в небольшой песочнице. Мы чаще всего работаем с виртуализацией KVM, но бывает OpenStack в связке с Ceph. А когда всё отработано — можно переходить и к работе на кластерах.
Архитектура платформы
Сейчас мы используем примерно такую схему:
На схеме можно условно выделить два основных блока: ПО, которое обрабатывает данные и складывает их в распределенную систему хранения данных, и системы обеспечения CI/CD.
Путь данных от источников до хранилища и дальнейших аналитических запросов, имеющих бизнес-ценность, включает следующие этапы:
-
Данные из источников попадают в шину данных. В качестве шины используется Apache Kafka.
-
Собранные Kafka данные из различных источников попадают в Spark и там обрабатываются.
-
Обработанные данные записываются в Greenplum.
-
После этого с данными могут работать аналитики и ML-системы.
Дальше я постараюсь раскрыть, почему выбран тот или иной продукт и какие рассматривали альтернативы. Например, для работы с данными сейчас используем Greenplum (продукт Pivotal), но ещё пробовали использовать Hadoop с HDFS и поверх этого Hive. Однако Greenplum показался быстрее, проще и у него открытый исходный код. Единственная проблема с Pivotal — как и Confluent, они не распространяют и не поддерживают свои продукты в РФ. Поэтому чтобы связать Spark с Greenplum, понадобится использовать что-нибудь опенсорсное (какие есть варианты — обсудим ниже).
Каждый узел платформы обработки данных мониторится. Для этого используем связку Prometheus + Grafana, а также свою систему мониторинга серверных параметров, по которым администраторы реагируют на инциденты раньше, чем наступили бы серьезные последствия.
За обеспечение непрерывной доставки отвечает следующий конвейер:
-
коммит попадает в GitLab;
-
Jenkins запускает сборщик продукта;
-
запуск собранного задания производится в Apache Spark;
-
статический анализатор SonarQube проверяет ошибки;
-
с помощью JMeter производится нагрузочное тестирование собранного задания.
Версии компонентов
К сожалению, нельзя просто взять последние версии всех компонентов — они могут не заработать вместе, и прощай ночные сборки. Поэтому привожу наш набор совместимости:
Apache Spark — 2.4.6
Java (OpenJDK) — 1.8.0
Apache Kafka — 2.4.1
Pivotal Greenplum — 6.9.0
PostgreSQL — 12.2
Apache JMeter — 5.2.1
Apache Ignite — 2.8.1
Jenkins — 2.235.5-lts
GitLab — 13.6.3
Sonarqube — 8.4.1
Grafana — 6.7
Prometheus — 2.17.0
Нагрузочное тестирование
Для того чтобы оценить, насколько построенная схема жизнеспособна, проводим нагрузочное тестирование, которое включает в себя следующие шаги.
-
Генерация правдоподобных данных —чтобы наполнить хранилище данными и проверить дальнейшие этапы, нужно создать профиль нагрузки, похожий на реальный. Это могут быть фейковые журналы с сервера, сгенерированные данные с датчиков и т.д.
-
Проверка на разных профилях нагрузки — чтобы узнать поведение при пиковой, повышенной и равномерной нагрузке и выявить компоненты, требующие масштабирования.
-
Анализ узких мест и их масштабирование — например, может понадобиться масштабировать БД, добавить узлы в Greenplum или ресурсы для обработчика данных.
-
Оценка пропускной способности — чтобы выяснить, как будет работать данная система целиком, насколько она пригодна к реальной нагрузке. Поэтому убеждаемся, что нет никаких аномалий: CPU не скачет, память не утекает и в целом решение готово к продакшену.
Также нагрузочное тестирование может быть частью CI/CD-процесса, тогда мы будем уверены в работоспособности системы после каждого деплоя, а не только после штучной проверки по списку.
Нагрузочное тестирование Яндекс.Танком
Первым вариантом, как проводить нагрузочное тестирование, была идея тестировать платформу обработки данных как веб-сайт — с помощью Яндекс.Танка: сделать микросервис, который принимает данные и посылает их в Kafka, и его тестировать. Для этого нужно предварительно подготовить нагрузку («патроны» для танка), чтобы трассировать данные. Это занимает некоторое время, а также оперативную память и место на диске.
В целом удобно — результаты и графики, как проходит нагрузка, можно посмотреть в облачном интерфейсе. Минус в том, что этот способ подходит только для HTTP-подобных протоколов и нагрузить можно только веб-ендпойнты, которые смотрят наружу.
Нагрузочное тестирование с Apache JMeter
Другой продукт, который можно использовать для нагрузочного тестирования, — это Apache JMeter. Он больше подходит для тестирования именно распределенных систем, потому что входит в экосистему Apache Software Foundation: для него есть плагины для прямой нагрузки на Kafka; можно нагружать не только веб-интерфейс, но и сразу шину данных; можно пробовать напрямую тестировать БД и есть поддержка сообщества.
Преимущества JMeter в итоге оказались решающими — на нём и остановились. Но перед этим свой велосипед тоже попробовали написать.
Нагрузочное тестирование самописным генератором
Если мы хотим обеспечить профилирование всего конвейера обработки данных от источников до СХД с учётом транспорта и обработки, то нужно иметь возможность трассировать и определить ёмкость каждого участка пути.
Один из подходов, который используется в Zipkin и Jaeger, состоит в генерации огромного количества пакетов данных с уникальными идентификаторами, создании с их помощью нагрузки и отслеживании всех этапов за счёт trace_id. При этом на каждом участке можно сделать обработчик и измерить, за сколько данные проходят участок пути от источника до обработчика, от обработчика до БД и т.д.
Общая схема понятна и позволяет подробно оценить возможности системы обработки данных, но чаще всего это избыточный метод, который требует дополнительных усилий и ресурсов. Обычно лучше использовать JMeter.
Брокер сообщений: Apache Kafka vs RabbitMQ
Выбрали Apache Kafka благодаря широким возможностям именно в контексте работы с большими данными и нагрузками, такими как хранение журнала, масштабируемость и достоверность. Более подробно о сравнении RabbitMQ и Apache Kafka можно прочитать в отдельной статье.
Apache Spark vs Flink
Для того чтобы выбрать один из этих фреймворков, решили попробовать оба. Подробное сравнение проводили 3 года назад, и тогда бросилось в глаза, что документации, примеров больше у Apache Spark, да и в целом распространенность у него больше. Но проведенное нагрузочное тестирование простых примеров, написанных на обоих фреймворках, показало лучшие результаты у Apache Flink. Например, Игорь Кураленок из “Яндекс.Облако” на недавнем митапе упоминал, что для потоковых данных рекомендуется использовать Apache Flink. Поэтому, судя по всему, этот фреймворк тоже зрелый и его можно использовать в разработке.
Но мы сделали выбор в пользу Apache Spark.
Требования к СХД
Три самых важных параметра, которые нужно оценить и учесть, составляя требования к СХД:
-
время восстановления после сбоя (RTO);
-
пропускная способность на запись (для потоковых данных из разнородных источников);
-
пропускная способность на чтение (для аналитики и машинного обучения).
Сложность работы СХД в случае обработки больших объемов потоковых данных в том, что нужно одновременно и быстро писать, и читать. Аналитики обычно хотят работать со свежими данными, желательно, с теми, которые только что пришли, — чтобы сразу что-то смотреть и делать свои аналитические выводы. Нагрузка на СХД при этом возрастает, и нужно следить, к каким репликам должны подключаться аналитики, и вообще, должны ли они подключаться к репликам или лучше работать напрямую.
Единственное требование с точки зрения железа — держать серверы в одном дата-центре.
К сожалению, точные параметры назвать трудно, они зависят от каждого конкретного случая и отличаются для разных объемов данных и выполняемых задач. Иногда может быть достаточно 4-х выделенных серверов, иногда нужно 12. В каких-то случаях нужны большие диски, а в каких-то лучше шардирование и диски по 1-2 ТБайт. Бывает, что общий объем хранилища измеряется петабайтами, но для нас более привычны объемы в несколько десятков терабайт.
Холодные хранилища данных
Пробовали работать с разными холодными хранилищами, в том числе, с Apache Hadoop и Cassandra.
Наш опыт работы с Cassandra небольшой, использовать её напрямую в системе аналитики сложнее. Это более медленная система и лучше подходит для долговременных хранилищ. Мы же говорим о быстрой обработке больших объемов данных — как структурированных, так и неструктурированных. Они попадают в озеро данных на Hadoop и хранятся там в чистом виде, пока не понадобятся, а потом уже передаются в тот же Greenplum и там обрабатываются.
В работе с Apache Hadoop использовали такую систему:
-
Apache Ambari для развертывания и управления. Иногда разворачиваем Ansible-скриптами — так больше степеней свободы, но зачастую через интерфейс управлять хранилищем проще, а у Ambari он удобный и хорошо всё разворачивается.
-
Дублирование namenode (активное и пассивное) — чтобы обеспечить high availability.
-
Репликация datanode с фактором репликации по умолчанию 3, но если ресурсов не хватает, то реплика-фактор может быть и 2.
-
Поверх HDFS Hive для аналитики и SQL-запросов и HBase как альтернатива Cassandra.
Pivotal Greenplum
Как уже писал ранее, в контексте работы с аналитикой остановились на Greenplum. High availability обеспечиваем за счет:
-
дублирования мастера — аналог namenode в Hadoop;
-
зеркалирования сегментов — аналог datanode в Hadoop, который представляет собой просто сервер в PostgreSQL;
-
RAID на железном уровне — это единственная рекомендация по железу, во всём остальном говорим про софт.
Spark+Greenplum = ?
Один из ключевых моментов статьи: каким образом можно обработать потоковые данные и записать их в систему хранения данных, которая сделана на Greenplum? Как уже говорил, несмотря на то, что Greenplum — продукт с открытым исходным кодом, вся обвязка и поддержка требует лицензии и контакта с западным рынком. Сейчас в России продукт не поддерживается.
Чтобы связать Spark и Greenplum, есть несколько вариантов:
-
Можно скачать коннектор Spark- Greenplum. Но в нём нет исходного кода (просто документация) и периодически нет доступа. На письма тоже отвечают не всегда, и непонятно, с чем это связано.
-
Есть opensource-библиотека, которая быстрее JDBC в несколько раз — в readme написано, что в 100 раз, если записывать напрямую. Но у неё нет интерфейса, который поддерживался бы самим Spark.
-
Мы сделали и совсем недавно выложили в opensource на GitHub свой коннектор Spark и Greenplum.
Наш Spark Greenplum connector распространяется под лицензией MIT, поддерживает DataSource API v2 Spark (только для Spark v2). Сейчас он ещё в альфа-версии и работает только на чтение, в дальнейшем доработаем и на запись, но пока, как и в предыдущем варианте, можно записывать при помощи JDBC-запросов через копирование. Над кодом работали несколько человек, но последнюю сборку и доработку интерфейса осуществлял @hovercraft-habr, и он сможет подробнее ответить на вопросы про технические тонкости.
Если статья вас заинтересовала, то, возможно, вы захотите попробовать этот коннектор. Мы с коллегами с удовольствием ответим на вопросы и учтём комментарии!