[Перевод] Масштабирование итеративных алгоритмов в Spark


Итеративные алгоритмы широко применяются в машинном обучении, связанных компонентах, ранжировании страниц и т.д. Эти алгоритмы усложняются итерациями, размеры данных на каждой итерации увеличивается, и сделать их отказоустойчивыми на каждой итерации непросто.

В этой статье я бы подробно остановился на некоторых моментах, которые необходимо учитывать при работе с этими задачами. Мы использовали Spark для реализации нескольких итерационных алгоритмов, таких как построение связанных компонентов, обход больших связанных компонентов и т.д. Ниже приведен мой опыт работы в лабораториях Walmart по построению связанных компонентов для 60 миллиардов узлов клиентской идентификации.

Количество итераций никогда не предопределено, всегда есть условие завершения ?.
Количество итераций никогда не предопределено, всегда есть условие завершения ?.

Типы итеративных алгоритмов

Конвергентные данные: Здесь мы видим, что с каждой итерацией количество данных уменьшается, т.е. мы начинаем 1-ю итерацию с огромными наборами данных, а размер данных уменьшается с увеличением количества итераций. Основной задачей будет работа с огромными наборами данных в первых нескольких итерациях, и после того, как набор данных значительно уменьшится, можно будет легко справляться с дальнейшими итерациями до их завершения.

Расходящиеся данные: Количество данных увеличивается при каждой итерации, и иногда они могут появляться быстрее и сделать невозможной дальнейшую работу. Для работы этих алгоритмов необходимы такие ограничения, как ограничения по количеству итераций, начальному размеру данных, вычислительной мощности и т.д.

Аналогичные данные: На каждой итерации у нас были бы более или менее одинаковые данные, и с таким алгоритмом было бы очень легко работать.

Инкрементные данные: На каждой итерации у нас могут появляться новые данные, особенно в ML у нас могут появляться новые обучающие наборы данных с периодическими интервалами.

Препятствия 

  1. RDD линии: Одним из распространенных способов сохранения отказоустойчивости системы является хранение копий данных в разных местах, чтобы в случае падения одного узла у нас была копия, которая помогала бы до тех пор, пока узел не восстановится. Но Spark не поддерживает дубликаты данных, а поддерживает линейный график преобразований, выполненных на данных в драйвере. Поэтому такой линейный график был бы полезен, если какой-либо фрагмент данных отсутствует, он может построить его обратно с помощью линейного графика, следовательно, Spark является отказоустойчивым. По мере того, как линейный график становится большим, становится трудно строить данные обратно, так как количество итераций увеличивается.

  2. Память и дисковый ввод/вывод: В Spark RDD являются непреложными, поэтому на каждой итерации мы будем создавать новую копию преобразованных данных (новый RDD), что увеличит использование Памяти и Диска. По мере того, как итерации будут увеличивать использование диска/памяти исполнителями, это может привести к замедлению работы из-за нехватки памяти и ожиданию, пока GC выполнит очистку. В некоторых случаях куча памяти будет недостаточной и может привести к невозможности выполнения задачи.

  3. Размер задачи: В некоторых случаях может быть несколько задач, которые могут не подходить для одного исполнителя, или одна задача занимает гораздо больше времени, чем остальные задачи, что может привести к препятствию.

Советы по преодолению вышеуказанных проблем

  1. Хранение большого линейного графика в памяти, и, в случае сбоя узла, восстановление потерянных наборов данных займет много времени. В таких случаях можно использовать кэширование или запись данных о состоянии в контрольной точке на каждой N итерации. Это сохранит рассчитанный RDD на каждой N итерации (кэширование будет храниться в памяти или на диске исполнителей, запись данных о состоянии в контрольной точке использует HDFS, мы должны принять решение исходя из нашей потребности, так как скорость будет различаться для каждой из них). В случае неудачи RDD вычисляется обратно от последней контрольной точки/кэширования. Вместо использования двух вышеупомянутых методов можно также создать временную таблицу и сохранить вычисленный набор данных, разделенный итерацией.  В случае неудачи задания Spark, можно сделать перезапуск с последней N-ой итерации, а преимущество сохранения во временную таблицу состоит в том, что можно избавиться от линейного графика RDD до этой итерации и запустить свежий линейный график с этой точки. По мере того, как линейный график RDD растет в итерационных алгоритмах, нам необходимо строить гибридные решения с использованием кэширования, контрольных точек (см. ссылку [2]) и временных таблиц для различных вариантов использования.

  2. Как и выше, сохранение во временную таблицу и чтение из временной таблицы может помочь избавиться от линейного графика и очистить память и диск предыдущих RDD. Такая запись и чтение замедляет процесс, но это даст огромное преимущество при работе с большими наборами данных. Особенно при конвергировании наборов данных, нам может понадобиться выполнять этот процесс только в течение первых нескольких итераций и использовать кэширование, когда наборы данных становятся маленькими при работе с итерациями. Экономия во временной таблице в качестве контрольной точки выглядит тривиально, но она не просто действует как контрольная точка. Так как мы избавляемся от истории линейных графов, делая это на периодических итерациях, это уменьшит риск сбоя в работе и сократит время на построение ее обратной копии из потерянных данных.

  3. Работа с расходящимися данными сложна, так как размер каждой задачи будет увеличиваться с увеличением количества итераций и займет намного больше времени для каждого исполнителя. Поэтому нам нужен фактор для вычисления количества задач в ( i + 1) итерации по сравнению с i-й итерацией таким образом, чтобы размер задачи остался прежним. Например, скажем, количество задач в i-й итерации — 100, и каждая задача обрабатывает около 100 МБ данных. В i+1 итерации размер каждой задачи увеличивается до 150 МБ, и мы можем перетасовать эти 100 задач до 150 задач и оставить 100 МБ на каждую задачу. Таким образом, в расходящихся наборах данных нам необходимо увеличить количество задач, переструктурировав и изменив перетасованные разделы на основе итерации.

  4. В случаях, когда размер spark задачи огромен, попробуйте увеличить память исполнителя в соответствии с размером задачи. А если нужно выполнить соединения на искаженных наборах данных, где 10% задач занимает 90% времени исполнения и 90% задач выполняются за 10% времени, эти задачи предлагается обрабатывать отдельно, выполняя их в виде двух разных запросов. Нужно определить причину больших задач, и можем ли мы разделить их на две группы, т.е. маленькие и большие задачи. В 1-м запросе мы бы обработали 90% задач, т.к. нет никаких препятствий для их обработки, и это заняло бы 10% времени, как и раньше. В другом запросе мы бы обрабатывали большие задачи (10% задач) с помощью всенаправленного соединения, так как количество таких задач меньше, а также избегали бы перетасовки данных.

Пример: Допустим, у нас есть таблица А и таблица Б. Таблица А — это данные о населении со столбцами user_id, имя, город, штат. Таблица B — это то, что группирует данные со столбцами user_id, group_id. Например, мы пытаемся найти 5 крупнейших городов с наибольшим количеством используемых групп. В этом примере могут быть тупиковые ситуации, как города с большим количеством населения могут быть большой задачей, пользователи с большим количеством групп могут привести к большим задачам. Для решения этих тупиковых ситуаций, объединение между этими таблицами может быть сделано в двух запросах. Мы можем отфильтровать больших пользователей с большим количеством групп (скажем, порог в 1000 групп на пользователя) и относиться к ним как к большим задачам. И выполнять соединения отдельно для больших пользователей, используя всенаправленное объединение, так как количество больших пользователей будет мало по сравнению с общими данными. Аналогичным образом, для остальных пользователей выполняйте тасовку объединения и объединяйте результаты и агрегируйте по городам, чтобы найти 5 лучших городов.


А прямо сейчас в OTUS открыт набор на курс «Экосистема Hadoop, Spark, Hive».

Всех желающих приглашаем записаться на бесплатный демо-урок по теме «Spark Streaming».

ЗАБРАТЬ СКИДКУ

Let’s block ads! (Why?)

Read More

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *