Организуем платформу обработки потоковых данных из Kafka, Spark и Greenplum

Привет, Хабр!

Меня зовут Иван Хозяинов, а работаю в ITSumma, где изучаю и применяю технологии, связанные с большими данными, машинным обучением и аналитикой. В этой статье хочу рассказать о системе хранения и обработки данных и инструментах, которые встречаются на пути от сырых исходников до представления, удобного для последующего анализа.

Поговорим, как связаны серверы в дата-центре и распределенные приложения для обработки данных и почему пришлось написать свой коннектор для Spark и Greenplum.

Какую задачу решаем

Обычно задача платформы обработки потоковых данных состоит в том, чтобы собрать, сохранить и провести аналитику данных, используя их в дальнейшем для бизнес-аналитики, машинного обучения и т.п. Причём:

  • Источников данных несколько (5-10), и они разнородные, например: журналы, логи, данные датчиков, телеметрия. Данные из этих источников через интернет попадают на приемник данных, через шину данных передаются в обработчик, записываются и попадают в хранилище. В этой статье уделим особое внимание как раз путям наполнения СХД.

  • Каждый источник генерирует порядка 500-1500 RPS. Чаще всего это структурированные пакеты данных в avro, json или бинарном формате. Но формат у разных датчиков отличается, соответственно, нужно уметь парсить входящие пакеты, чтобы уже потом записывать в СХД и обрабатывать.

  • Система хранения данных подключается к системам аналитики (BI) или ML-моделям для обучения на исторических данных.

Требования к железу всегда индивидуальны, но чаще всего начать можно с базовой конфигурации:

  • RAM: 32G на каждый сервер;

  • CPU: 8 ядер на каждый сервер;

  • HDD+SSD: 16 TБайт суммарно;

  • Сеть: внутренняя сеть с пропускной способностью не менее 1 Гбит/с.

А для прототип подойдёт и что-нибудь попроще: например, dedicated-серверы, где можно разворачивать виртуальные машины. Прежде чем масштабировать систему сбора, хранения и обработки данных, лучше её проверить в небольшой песочнице. Мы чаще всего работаем с виртуализацией KVM, но бывает OpenStack в связке с Ceph. А когда всё отработано — можно переходить и к работе на кластерах.

Архитектура платформы

Сейчас мы используем примерно такую схему:

На схеме можно условно выделить два основных блока: ПО, которое обрабатывает данные и складывает их в распределенную систему хранения данных, и системы обеспечения CI/CD.

Путь данных от источников до хранилища и дальнейших аналитических запросов, имеющих бизнес-ценность, включает следующие этапы:

  • Данные из источников попадают в шину данных. В качестве шины используется Apache Kafka.

  • Собранные Kafka данные из различных источников попадают в Spark и там обрабатываются.

  • Обработанные данные записываются в Greenplum.

  • После этого с данными могут работать аналитики и ML-системы.

Дальше я постараюсь раскрыть, почему выбран тот или иной продукт и какие рассматривали альтернативы. Например, для работы с данными сейчас используем Greenplum (продукт Pivotal), но ещё пробовали использовать Hadoop с HDFS и поверх этого Hive. Однако Greenplum показался быстрее, проще и у него открытый исходный код. Единственная проблема с Pivotal — как и Confluent, они не распространяют и не поддерживают свои продукты в РФ. Поэтому чтобы связать Spark с Greenplum, понадобится использовать что-нибудь опенсорсное (какие есть варианты — обсудим ниже).

Каждый узел платформы обработки данных мониторится. Для этого используем связку Prometheus + Grafana, а также свою систему мониторинга серверных параметров, по которым администраторы реагируют на инциденты раньше, чем наступили бы серьезные последствия.

За обеспечение непрерывной доставки отвечает следующий конвейер:

  • коммит попадает в GitLab;

  • Jenkins запускает сборщик продукта;

  • запуск собранного задания производится в Apache Spark;

  • статический анализатор SonarQube проверяет ошибки;

  • с помощью JMeter производится нагрузочное тестирование собранного задания.

Версии компонентов

К сожалению, нельзя просто взять последние версии всех компонентов — они могут не заработать вместе, и прощай ночные сборки. Поэтому привожу наш набор совместимости:

Apache Spark — 2.4.6

Java (OpenJDK) — 1.8.0

Apache Kafka — 2.4.1

Pivotal Greenplum — 6.9.0

PostgreSQL — 12.2

Apache JMeter — 5.2.1

Apache Ignite — 2.8.1

Jenkins — 2.235.5-lts

GitLab — 13.6.3

Sonarqube — 8.4.1

Grafana — 6.7

Prometheus — 2.17.0

Нагрузочное тестирование

Для того чтобы оценить, насколько построенная схема жизнеспособна, проводим нагрузочное тестирование, которое включает в себя следующие шаги.

  • Генерация правдоподобных данных —чтобы наполнить хранилище данными и проверить дальнейшие этапы, нужно создать профиль нагрузки, похожий на реальный. Это могут быть фейковые журналы с сервера, сгенерированные данные с датчиков и т.д.

  • Проверка на разных профилях нагрузки — чтобы узнать поведение при пиковой, повышенной и равномерной нагрузке и выявить компоненты, требующие масштабирования.

  • Анализ узких мест и их масштабирование — например, может понадобиться масштабировать БД, добавить узлы в Greenplum или ресурсы для обработчика данных.

  • Оценка пропускной способности — чтобы выяснить, как будет работать данная система целиком, насколько она пригодна к реальной нагрузке. Поэтому убеждаемся, что нет никаких аномалий: CPU не скачет, память не утекает и в целом решение готово к продакшену.

Также нагрузочное тестирование может быть частью CI/CD-процесса, тогда мы будем уверены в работоспособности системы после каждого деплоя, а не только после штучной проверки по списку.

Нагрузочное тестирование Яндекс.Танком

Первым вариантом, как проводить нагрузочное тестирование, была идея тестировать платформу обработки данных как веб-сайт — с помощью Яндекс.Танка: сделать микросервис, который принимает данные и посылает их в Kafka, и его тестировать. Для этого нужно предварительно подготовить нагрузку («патроны» для танка), чтобы трассировать данные. Это занимает некоторое время, а также оперативную память и место на диске.

В целом удобно — результаты и графики, как проходит нагрузка, можно посмотреть в облачном интерфейсе. Минус в том, что этот способ подходит только для HTTP-подобных протоколов и нагрузить можно только веб-ендпойнты, которые смотрят наружу.

Нагрузочное тестирование с Apache JMeter

Другой продукт, который можно использовать для нагрузочного тестирования, — это Apache JMeter. Он больше подходит для тестирования именно распределенных систем, потому что входит в экосистему Apache Software Foundation: для него есть плагины для прямой нагрузки на Kafka; можно нагружать не только веб-интерфейс, но и сразу шину данных; можно пробовать напрямую тестировать БД и есть поддержка сообщества.

Преимущества JMeter в итоге оказались решающими — на нём и остановились. Но перед этим свой велосипед тоже попробовали написать.

Нагрузочное тестирование самописным генератором

Если мы хотим обеспечить профилирование всего конвейера обработки данных от источников до СХД с учётом транспорта и обработки, то нужно иметь возможность трассировать и определить ёмкость каждого участка пути.

Один из подходов, который используется в Zipkin и Jaeger, состоит в генерации огромного количества пакетов данных с уникальными идентификаторами, создании с их помощью нагрузки и отслеживании всех этапов за счёт trace_id. При этом на каждом участке можно сделать обработчик и измерить, за сколько данные проходят участок пути от источника до обработчика, от обработчика до БД и т.д.

Общая схема понятна и позволяет подробно оценить возможности системы обработки данных, но чаще всего это избыточный метод, который требует дополнительных усилий и ресурсов. Обычно лучше использовать JMeter.

Брокер сообщений: Apache Kafka vs RabbitMQ

Выбрали Apache Kafka благодаря широким возможностям именно в контексте работы с большими данными и нагрузками, такими как хранение журнала, масштабируемость и достоверность. Более подробно о сравнении RabbitMQ и Apache Kafka можно прочитать в отдельной статье.

Apache Spark vs Flink

Для того чтобы выбрать один из этих фреймворков, решили попробовать оба. Подробное сравнение проводили 3 года назад, и тогда бросилось в глаза, что документации, примеров больше у Apache Spark, да и в целом распространенность у него больше. Но проведенное нагрузочное тестирование простых примеров, написанных на обоих фреймворках, показало лучшие результаты у Apache Flink. Например, Игорь Кураленок из “Яндекс.Облако” на недавнем митапе упоминал, что для потоковых данных рекомендуется использовать Apache Flink. Поэтому, судя по всему, этот фреймворк тоже зрелый и его можно использовать в разработке.

Но мы сделали выбор в пользу Apache Spark.

Требования к СХД

Три самых важных параметра, которые нужно оценить и учесть, составляя требования к СХД:

  • время восстановления после сбоя (RTO);

  • пропускная способность на запись (для потоковых данных из разнородных источников);

  • пропускная способность на чтение (для аналитики и машинного обучения).

Сложность работы СХД в случае обработки больших объемов потоковых данных в том, что нужно одновременно и быстро писать, и читать. Аналитики обычно хотят работать со свежими данными, желательно, с теми, которые только что пришли, — чтобы сразу что-то смотреть и делать свои аналитические выводы. Нагрузка на СХД при этом возрастает, и нужно следить, к каким репликам должны подключаться аналитики, и вообще, должны ли они подключаться к репликам или лучше работать напрямую.

Единственное требование с точки зрения железа — держать серверы в одном дата-центре.

К сожалению, точные параметры назвать трудно, они зависят от каждого конкретного случая и отличаются для разных объемов данных и выполняемых задач. Иногда может быть достаточно 4-х выделенных серверов, иногда нужно 12. В каких-то случаях нужны большие диски, а в каких-то лучше шардирование и диски по 1-2 ТБайт. Бывает, что общий объем хранилища измеряется петабайтами, но для нас более привычны объемы в несколько десятков терабайт.

Холодные хранилища данных

Пробовали работать с разными холодными хранилищами, в том числе, с Apache Hadoop и Cassandra.

Наш опыт работы с Cassandra небольшой, использовать её напрямую в системе аналитики сложнее. Это более медленная система и лучше подходит для долговременных хранилищ. Мы же говорим о быстрой обработке больших объемов данных — как структурированных, так и неструктурированных. Они попадают в озеро данных на Hadoop и хранятся там в чистом виде, пока не понадобятся, а потом уже передаются в тот же Greenplum и там обрабатываются.

В работе с Apache Hadoop использовали такую систему:

  • Apache Ambari для развертывания и управления. Иногда разворачиваем Ansible-скриптами — так больше степеней свободы, но зачастую через интерфейс управлять хранилищем проще, а у Ambari он удобный и хорошо всё разворачивается.

  • Дублирование namenode (активное и пассивное) — чтобы обеспечить high availability.

  • Репликация datanode с фактором репликации по умолчанию 3, но если ресурсов не хватает, то реплика-фактор может быть и 2.

  • Поверх HDFS Hive для аналитики и SQL-запросов и HBase как альтернатива Cassandra.

Pivotal Greenplum

Как уже писал ранее, в контексте работы с аналитикой остановились на Greenplum. High availability обеспечиваем за счет:

  • дублирования мастера — аналог namenode в Hadoop;

  • зеркалирования сегментов — аналог datanode в Hadoop, который представляет собой просто сервер в PostgreSQL;

  • RAID на железном уровне — это единственная рекомендация по железу, во всём остальном говорим про софт.

Spark+Greenplum = ?

Один из ключевых моментов статьи: каким образом можно обработать потоковые данные и записать их в систему хранения данных, которая сделана на Greenplum? Как уже говорил, несмотря на то, что Greenplum — продукт с открытым исходным кодом, вся обвязка и поддержка требует лицензии и контакта с западным рынком. Сейчас в России продукт не поддерживается.

Чтобы связать Spark и Greenplum, есть несколько вариантов:

  • Можно скачать коннектор Spark- Greenplum. Но в нём нет исходного кода (просто документация) и периодически нет доступа. На письма тоже отвечают не всегда, и непонятно, с чем это связано.

  • Есть opensource-библиотека, которая быстрее JDBC в несколько раз — в readme написано, что в 100 раз, если записывать напрямую. Но у неё нет интерфейса, который поддерживался бы самим Spark.

  • Мы сделали и совсем недавно выложили в opensource на GitHub свой коннектор Spark и Greenplum.

Наш Spark Greenplum connector распространяется под лицензией MIT, поддерживает DataSource API v2 Spark (только для Spark v2). Сейчас он ещё в альфа-версии и работает только на чтение, в дальнейшем доработаем и на запись, но пока, как и в предыдущем варианте, можно записывать при помощи JDBC-запросов через копирование. Над кодом работали несколько человек, но последнюю сборку и доработку интерфейса осуществлял @hovercraft-habr, и он сможет подробнее ответить на вопросы про технические тонкости.

Если статья вас заинтересовала, то, возможно, вы захотите попробовать этот коннектор. Мы с коллегами с удовольствием ответим на вопросы и учтём комментарии!

Let’s block ads! (Why?)

Read More

Recent Posts

Apple возобновила переговоры с OpenAI и Google для интеграции ИИ в iPhone

Apple возобновила переговоры с OpenAI о возможности внедрения ИИ-технологий в iOS 18, на основе данной операционной системы будут работать новые…

2 дня ago

Российская «дочка» Google подготовила 23 иска к крупнейшим игрокам рекламного рынка

Конкурсный управляющий российской «дочки» Google подготовил 23 иска к участникам рекламного рынка. Общая сумма исков составляет 16 млрд рублей –…

2 дня ago

Google завершил обновление основного алгоритма March 2024 Core Update

Google завершил обновление основного алгоритма March 2024 Core Update. Раскатка обновлений была завершена 19 апреля, но сообщил об этом поисковик…

2 дня ago

Нейросети будут писать тексты объявления за продавцов на Авито

У частных продавцов на Авито появилась возможность составлять текст объявлений с помощью нейросети. Новый функционал доступен в категории «Обувь, одежда,…

2 дня ago

Объявлены победители международной премии Workspace Digital Awards-2024

24 апреля 2024 года в Москве состоялась церемония вручения наград международного конкурса Workspace Digital Awards. В этом году участниками стали…

3 дня ago

Яндекс проведет гик-фестиваль Young Con

27 июня Яндекс проведет гик-фестиваль Young Con для студентов и молодых специалистов, которые интересуются технологиями и хотят работать в IT.…

3 дня ago