[Перевод] Масштабирование итеративных алгоритмов в Spark
Итеративные алгоритмы широко применяются в машинном обучении, связанных компонентах, ранжировании страниц и т.д. Эти алгоритмы усложняются итерациями, размеры данных на каждой итерации увеличивается, и сделать их отказоустойчивыми на каждой итерации непросто.
В этой статье я бы подробно остановился на некоторых моментах, которые необходимо учитывать при работе с этими задачами. Мы использовали Spark для реализации нескольких итерационных алгоритмов, таких как построение связанных компонентов, обход больших связанных компонентов и т.д. Ниже приведен мой опыт работы в лабораториях Walmart по построению связанных компонентов для 60 миллиардов узлов клиентской идентификации.
Типы итеративных алгоритмов
Конвергентные данные: Здесь мы видим, что с каждой итерацией количество данных уменьшается, т.е. мы начинаем 1-ю итерацию с огромными наборами данных, а размер данных уменьшается с увеличением количества итераций. Основной задачей будет работа с огромными наборами данных в первых нескольких итерациях, и после того, как набор данных значительно уменьшится, можно будет легко справляться с дальнейшими итерациями до их завершения.
Расходящиеся данные: Количество данных увеличивается при каждой итерации, и иногда они могут появляться быстрее и сделать невозможной дальнейшую работу. Для работы этих алгоритмов необходимы такие ограничения, как ограничения по количеству итераций, начальному размеру данных, вычислительной мощности и т.д.
Аналогичные данные: На каждой итерации у нас были бы более или менее одинаковые данные, и с таким алгоритмом было бы очень легко работать.
Инкрементные данные: На каждой итерации у нас могут появляться новые данные, особенно в ML у нас могут появляться новые обучающие наборы данных с периодическими интервалами.
Препятствия
-
RDD линии: Одним из распространенных способов сохранения отказоустойчивости системы является хранение копий данных в разных местах, чтобы в случае падения одного узла у нас была копия, которая помогала бы до тех пор, пока узел не восстановится. Но Spark не поддерживает дубликаты данных, а поддерживает линейный график преобразований, выполненных на данных в драйвере. Поэтому такой линейный график был бы полезен, если какой-либо фрагмент данных отсутствует, он может построить его обратно с помощью линейного графика, следовательно, Spark является отказоустойчивым. По мере того, как линейный график становится большим, становится трудно строить данные обратно, так как количество итераций увеличивается.
-
Память и дисковый ввод/вывод: В Spark RDD являются непреложными, поэтому на каждой итерации мы будем создавать новую копию преобразованных данных (новый RDD), что увеличит использование Памяти и Диска. По мере того, как итерации будут увеличивать использование диска/памяти исполнителями, это может привести к замедлению работы из-за нехватки памяти и ожиданию, пока GC выполнит очистку. В некоторых случаях куча памяти будет недостаточной и может привести к невозможности выполнения задачи.
-
Размер задачи: В некоторых случаях может быть несколько задач, которые могут не подходить для одного исполнителя, или одна задача занимает гораздо больше времени, чем остальные задачи, что может привести к препятствию.
Советы по преодолению вышеуказанных проблем
-
Хранение большого линейного графика в памяти, и, в случае сбоя узла, восстановление потерянных наборов данных займет много времени. В таких случаях можно использовать кэширование или запись данных о состоянии в контрольной точке на каждой N итерации. Это сохранит рассчитанный RDD на каждой N итерации (кэширование будет храниться в памяти или на диске исполнителей, запись данных о состоянии в контрольной точке использует HDFS, мы должны принять решение исходя из нашей потребности, так как скорость будет различаться для каждой из них). В случае неудачи RDD вычисляется обратно от последней контрольной точки/кэширования. Вместо использования двух вышеупомянутых методов можно также создать временную таблицу и сохранить вычисленный набор данных, разделенный итерацией. В случае неудачи задания Spark, можно сделать перезапуск с последней N-ой итерации, а преимущество сохранения во временную таблицу состоит в том, что можно избавиться от линейного графика RDD до этой итерации и запустить свежий линейный график с этой точки. По мере того, как линейный график RDD растет в итерационных алгоритмах, нам необходимо строить гибридные решения с использованием кэширования, контрольных точек (см. ссылку [2]) и временных таблиц для различных вариантов использования.
-
Как и выше, сохранение во временную таблицу и чтение из временной таблицы может помочь избавиться от линейного графика и очистить память и диск предыдущих RDD. Такая запись и чтение замедляет процесс, но это даст огромное преимущество при работе с большими наборами данных. Особенно при конвергировании наборов данных, нам может понадобиться выполнять этот процесс только в течение первых нескольких итераций и использовать кэширование, когда наборы данных становятся маленькими при работе с итерациями. Экономия во временной таблице в качестве контрольной точки выглядит тривиально, но она не просто действует как контрольная точка. Так как мы избавляемся от истории линейных графов, делая это на периодических итерациях, это уменьшит риск сбоя в работе и сократит время на построение ее обратной копии из потерянных данных.
-
Работа с расходящимися данными сложна, так как размер каждой задачи будет увеличиваться с увеличением количества итераций и займет намного больше времени для каждого исполнителя. Поэтому нам нужен фактор для вычисления количества задач в ( i + 1) итерации по сравнению с i-й итерацией таким образом, чтобы размер задачи остался прежним. Например, скажем, количество задач в i-й итерации — 100, и каждая задача обрабатывает около 100 МБ данных. В i+1 итерации размер каждой задачи увеличивается до 150 МБ, и мы можем перетасовать эти 100 задач до 150 задач и оставить 100 МБ на каждую задачу. Таким образом, в расходящихся наборах данных нам необходимо увеличить количество задач, переструктурировав и изменив перетасованные разделы на основе итерации.
-
В случаях, когда размер spark задачи огромен, попробуйте увеличить память исполнителя в соответствии с размером задачи. А если нужно выполнить соединения на искаженных наборах данных, где 10% задач занимает 90% времени исполнения и 90% задач выполняются за 10% времени, эти задачи предлагается обрабатывать отдельно, выполняя их в виде двух разных запросов. Нужно определить причину больших задач, и можем ли мы разделить их на две группы, т.е. маленькие и большие задачи. В 1-м запросе мы бы обработали 90% задач, т.к. нет никаких препятствий для их обработки, и это заняло бы 10% времени, как и раньше. В другом запросе мы бы обрабатывали большие задачи (10% задач) с помощью всенаправленного соединения, так как количество таких задач меньше, а также избегали бы перетасовки данных.
Пример: Допустим, у нас есть таблица А и таблица Б. Таблица А — это данные о населении со столбцами user_id
, имя
, город
, штат
. Таблица B — это то, что группирует данные со столбцами user_id
, group_id
. Например, мы пытаемся найти 5 крупнейших городов с наибольшим количеством используемых групп. В этом примере могут быть тупиковые ситуации, как города с большим количеством населения могут быть большой задачей, пользователи с большим количеством групп могут привести к большим задачам. Для решения этих тупиковых ситуаций, объединение между этими таблицами может быть сделано в двух запросах. Мы можем отфильтровать больших пользователей с большим количеством групп (скажем, порог в 1000 групп на пользователя) и относиться к ним как к большим задачам. И выполнять соединения отдельно для больших пользователей, используя всенаправленное объединение, так как количество больших пользователей будет мало по сравнению с общими данными. Аналогичным образом, для остальных пользователей выполняйте тасовку объединения и объединяйте результаты и агрегируйте по городам, чтобы найти 5 лучших городов.
А прямо сейчас в OTUS открыт набор на курс «Экосистема Hadoop, Spark, Hive».
Всех желающих приглашаем записаться на бесплатный демо-урок по теме «Spark Streaming».