Работа с асинхронностью в Dart
Всем привет! Меня зовут Дмитрий Репин, я Flutter-разработчик в Surf.
В этой статье я расскажу, как работать с асинхронностью в Dart: всё о самых важных классах библиотеки dart:async с примерами под катом. Поговорим о том, как в однопоточном языке сходить в сеть или базу данных и при этом не затормозить приложение.
Эта статья написана по материалам моего ролика на YouTube. Посмотрите видео, если больше любите слушать, чем читать.
Dart — однопоточный язык программирования, который выполняется в одном процессе. Что это значит по факту: если мы будем в одном потоке выполнять любую операцию, требующую времени, то приложение просто подвиснет. И всё время, пока мы, например, ждём ответа от сервера или выполнения запроса в БД, пользователь будет страдать и смотреть на лагающий интерфейс.
К счастью, Dart — хитрый однопоточный язык, который предоставляет механизм Event Loop — он даёт возможность откладывать какие-то операции на потом, когда поток будет посвободнее. Такие отложенные операции мы будем называть асинхронными.
Все операции в Dart можно разделить на два типа:
-
Синхронные — те, что блокируют другие операции до своего выполнения.
-
Асинхронные, которые позволяют другим операциям выполняться, пока текущая не закончится.
Вместе с операциями, функции тоже можно поделить на синхронные и асинхронные. И синхронные, и асинхронные функции могут содержать синхронные операции. Асинхронной функция начнёт считаться в тот момент, когда в ней появляется хотя бы одна асинхронная операция.
Создание асинхронной функции с помощью класса Future
Чтобы было нагляднее, давайте напишем простую асинхронную функцию. Пусть она будет принимать строку и смотреть на её длину: если она больше 10 символов, функция вернёт число 42, в ином случае — ошибку. Представим, что вычисление длины строки — достаточно тяжеловесная операция чтобы сделать её асинхронной.
Чтобы сделать код асинхронным, нам понадобится класс Future. В конструктор он принимает функцию, которую необходимо выполнить асинхронно, а также предоставляет два обработчика: then и catchError. Первый обрабатывает успешное выполнение функции, а второй — выполнение функции с ошибкой. В итоге у нас должен получиться такой код:
runSimpleFutureExample({
String question = 'В чем смысл жизни и всего такого?',
}) {
print('Start of future example');
Future(() {
print('Вопрос: $question');
if (question.length > 10) {
return 42;
} else {
throw Exception('Вы задали недостаточно сложный вопрос.');
}
}).then((result) {
print('Ответ: $result');
}).catchError((error) {
print('Ошибка. $error');
});
print('Finish of future example');
}
В начале и конце метода выводятся строки о том, что он начал и закончил работу, а между ними создаётся сам future. В конструктор он принимает метод, который вычисляет длину строки, а затем к Future добавляются обработчики then и catchError, которые обрабатывают выполнение Future. Попробуем запустить этот пример и посмотрим, в каком порядке выводится на консоль результат:
Start of future example
Finish of future example
Вопрос: В чем смысл жизни и всего такого?
Ответ: 42
В результате сначала выводятся строки о начале и завершении метода, а потом выполняется код самого Future. Дело в том, что асинхронный код в Dart выполняется исключительно после синхронного. Соответственно, в синхронном коде мы не можем обработать результат асинхронной операции — при работе с асинхронщиной стоит держать это в голове.
Работать с Future через обработчики then и catchError — не самая хорошая идея, особенно если нужно передать результат одного Future во второй, а второго в третий и т. д., порождая callback hell.
Для таких случаев Dart предоставляет ключевые слова async и await. Словом async помечается функция, исполняющая асинхронные операции через await. Словом await помечаются сами асинхронные операции. К сожалению, при таком подходе мы теряем обработчик catchError, однако никто не мешает нам обрабатывать ошибки через стандартный try/catch. Чтобы понять, как это работает, перепишем наш предыдущий пример с использованием async/await и для удобства вынесем Future в отдельный метод:
runFutureWithAwaitExample({
String question = 'В чем смысл жизни и всего такого?',
}) async {
print('Start of future example');
try {
final result = await _getAnswerForQuestion(
'В чем смысл жизни и всего такого?',
);
print('Ответ: $result');
} catch (error) {
print('Ошибка. $error');
}
print('Finish of future example');
}
Future<int> _getAnswerForQuestion(String question) => Future(() {
print('Вопрос: $question');
if (question.length > 10) {
return 42;
} else {
throw Exception('Вы задали недостаточно сложный вопрос.');
}
});
Здесь мы объявили метод, возвращающий Future. Вызываем его с использованием слова await, передавая в переменную result, с которой мы можем работать дальше, как будто в синхронном коде. Выведем на консоль результат работы метода:
Start of future example
Вопрос: В чем смысл жизни и всего такого?
Ответ: 42
Finish of future example
Как видим, теперь Future обрабатывается внутри метода, а не после его завершения. Но при этом не стоит забывать, что сам метод теперь стал асинхронным и выполнится только после выполнения синхронного кода.
Обработка последовательности асинхронных событий с помощью Stream
Помимо Future, есть ещё один важный класс для работы с асинхронностью — Stream. В отличие от Future, Stream может обрабатывать целый набор асинхронных событий. Давайте сразу разберём на примере:
void runStreamSimpleExample() {
print('Simple stream example started');
final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
stream.listen((number) {
print('listener: $number');
});
print('Simple stream example finished');
}
У стрима много разных конструкторов, посмотрите их в документации. Здесь используем fromIterable. Создаём в массиве пять чисел, подписываемся на стрим через метод listen и выводим на консоль:
Simple stream example started
Simple stream example finished
listener: 1
listener: 2
listener: 3
listener: 4
listener: 5
Как и в случае с Future, сначала выполняется синхронный код: сообщения о старте и завершения примера. Только потом обрабатывается стрим и выводятся на консоль числа. В этом примере подписка на стрим обрабатывается асинхронно через listen, однако есть способ обрабатывать стрим через знакомые нам ключевые слова async и await. Перепишем пример с их использованием:
void runStreamAwaitedSimpleExample() async {
print('Simple stream example with await started');
final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
await for (final number in stream) {
print('Number: $number');
}
print('Simple stream example with await finished');
}
Теперь обработка стрима похожа на обычный цикл. Но не стоит обрабатывать слишком большие стримы таким образом: это может затормозить работу программы. Выведем результат на консоль:
Simple stream example started
listener: 1
listener: 2
listener: 3
listener: 4
listener: 5
Simple stream example finished
Тут всё так же, как и с async/await во future. Сама функция стала асинхронной, поэтому теперь обработка Stream происходит до завершения метода.
Single-subscription и broadcast стримы. Основы StreamController
Представим, что нам нужно подписаться на стрим в двух разных местах программы. Для этого можно сделать какой-нибудь синглтон, который бы предоставлял стрим и который можно было бы дёргать в двух разных частях кода. Чтобы не усложнять, попробуем подписаться на стрим дважды в одном методе, и посмотрим, что получится:
print('Simple stream example started');
final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
stream.listen((number) {
print('listener 1: $number');
});
stream.listen((number) {
print('listener 2: $number');
});
print('Simple stream example finished');
И результат:
The following StateError was thrown while handling a gesture.
Bad state: Stream has already been listened to.
Мы получаем ошибку: на стрим уже произведена подписка и подписаться второй раз не выйдет.
Здесь стоит рассказать о том, что существует два типа стримов: single-subscription и broadcast стримы. Мы создали single-subscription стрим. Такие стримы поставляют все данные подписчику разом и только после самой подписки. Так и происходит в нашем примере.
Broadcast стримы отдают свои данные вне зависимости от того, подписан ли кто-нибудь на них или нет. При этом подписчики стрима получают события только с момента подписки, а не с момента старта жизни стрима. При создании стрима стоит посмотреть документацию к конструктору и разобраться, какие ограничения он накладывает и стрим какого типа создаётся через этот конструктор.
Чтобы увидеть разницу, создадим broadcast стрим. В этом примере для создания стрима мы не будем использовать непосредственно сам Stream. Мы будем работать с классом StreamController. Он предоставляет доступ к самому стриму, даёт возможность управлять им и добавлять в него события.
///пример broadcast стрима
void runBroadcastStreamExample() {
print('Broadcast stream example started');
final streamController = StreamController.broadcast();
streamController.stream.listen((number) {
print('Listener 1: $number');
});
streamController.stream.listen((number) {
print('Listener 2: $number');
});
streamController.sink.add(1);
streamController.sink.add(2);
streamController.sink.add(3);
streamController.sink.add(4);
streamController.sink.add(5);
streamController.close();
print('Broadcast stream example finished');
}
Мы создали StreamController с broadcast стримом, дважды подписались на него и добавили через sink пять чисел, после чего закрыли стрим. Смотрим, что ушло на консоль:
Broadcast stream example started
Broadcast stream example finished
Listener 1: 1
Listener 2: 1
Listener 1: 2
Listener 2: 2
Listener 1: 3
Listener 2: 3
Listener 1: 4
Listener 2: 4
Listener 1: 5
Listener 2: 5
Проблем не возникло: два подписчика отработали, как и ожидалось. На самом деле у StreamController достаточно много функций, рекомендую почитать документацию к нему.
Отписка от стрима с помощью StreamSubscription
Помимо управления самим стримом часто возникает потребность управлять подпиской на него — для этого в dart:async добавлен StreamSubscription. Кстати, мы уже неявно работали с ним в наших предыдущих примерах, когда подписывались на стрим: на самом деле метод listen возвращает его, и таким образом мы можем выделить подписку в переменную. Модифицируем предыдущий пример так, чтобы вторая подписка отменялась после числа 3:
///пример broadcast стрима
void runBroadcastStreamExample() {
print('Broadcast stream example started');
final streamController = StreamController.broadcast();
streamController.stream.listen((number) {
print('Listener 1: $number');
});
StreamSubscription sub2;
sub2 = streamController.stream.listen((number) {
print('Listener 2: $number');
if (number == 3) {
sub2.cancel();
}
});
streamController.sink.add(1);
streamController.sink.add(2);
streamController.sink.add(3);
streamController.sink.add(4);
streamController.sink.add(5);
streamController.close();
print('Broadcast stream example finished');
}
Тут без сюрпризов, на консоль выводится то, что мы ожидали:
Broadcast stream example started
Broadcast stream example finished
Listener 1: 1
Listener 2: 1
Listener 1: 2
Listener 2: 2
Listener 1: 3
Listener 2: 3
Listener 1: 4
Listener 1: 5
Стоит отметить, что StreamSubscription не только даёт возможность отменять подписку полностью, но и приостанавливать и возобновлять её, а также предоставляет колбэки на события подписки.
Как упростить работу с асинхронностью
Мы разобрали все самые важные классы для dart:async, но осталось ещё три, о которых я бы хотел рассказать. Возможно, они используются не так часто, но могут быть очень полезны в определённых ситуациях.
Completer
Completer позволяет поставлять Future, отправлять событие о выполнении или событие об ошибке. Это может быть полезно, когда нужно сделать цепочку Future и вернуть результат.
Разберём на примере. Пусть у нас будет completer, который складывает два числа с трёхсекундной задержкой. Это очень простой пример, но важно понимать, что тут можно сделать большую вложенность из Future и компактно уместить в один класс.
class CompleterTester {
void runCompleterInitTest() async {
print('Completer example started');
var sumCompleter = SumCompleter();
var sum = await sumCompleter.sum(20, 22);
print('Completer result: ' + sum.toString());
print('Completer example finished');
}
}
class SumCompleter {
Completer<int> completer = Completer();
Future<int> sum(int a, int b) {
_sumAsync(a, b);
return completer.future;
}
void _sumAsync(int a, int b) {
Future.delayed(Duration(seconds: 3), () {
return a + b;
}).then((value) {
completer.complete(value);
});
}
}
Разберём, что тут произошло. Мы создали класс SumCompleter и создали в нём completer. Затем объявили метод sum, который вызывает приватный метод на сложение и возвращает Future этого completer. Затем этот приватный метод выполняет операцию и вызывает метод complete у комплитера. После этого пользователь метода получает результат.
Такой подход позволяет инкапсулировать в себе любую сложную часто используемую асинхронную логику и удобно с ней работать.
StreamIterator
Представьте, что вам нужно управлять переходом к следующему айтему стрима и делать это именно тогда, когда вам нужно. Именно это и делает StreamIterator. Он предоставляет метод moveNext для перехода к следующему элементу стрима. moveNext вернет true, если элемент пришел, и false, если стрим был закрыт. Также StreamIterator предоставляет свойство current для получения текущего элемента. Ну и конечно, метод cancel для отмены подписки. Небольшой пример по работе класса:
void runStreamIteratorExample() async {
print('StreamIteratorExample started');
var stream = Stream.fromIterable([1, 2, 3]);
var iterator = StreamIterator(stream);
bool moveResult;
do {
moveResult = await iterator.moveNext();
print('number: ${iterator.current}');
} while (moveResult);
print('StreamIteratorExample finished');
}
С использованием этого StreamIterator мы будем переходить к следующему числу только в то время, когда нам это удобно, выполнив перед этим все нужные действия.
StreamTransformer
На самом деле это немного переусложнённая штука, но она отлично справляется, если вам нужно много раз трансформировать поток в приложении определённым образом. Самый очевидный пример — разделение файла на строки. Разберём на примере и напишем трансформер, который будет удваивать приходящее в него число:
void runStreamTransformerExample() async {
print('StreamTransformer example started');
StreamTransformer doubleTransformer =
new StreamTransformer.fromHandlers(handleData: (data, EventSink sink) {
sink.add(data * 2);
});
StreamController controller = StreamController();
controller.stream.transform(doubleTransformer).listen((data) {
print('data: $data');
});
controller.add(1);
controller.add(2);
controller.add(3);
print('StreamTransformer example finished');
}
В нашем случае он будет принимать на вход данные, которые нужно трансформировать, и sink, в который мы должны передать трансформированные данные. В теле метода просто удваиваем приходящее значение. Затем остаётся только трансформировать стрим с помощью метода transform. И вуаля: в методе listen значение удваивается. Обратите внимание: трансформер может и не отдать данные в синк, а может отдать два раза. То есть он работает не только как map или where, а наделён гораздо большей функциональностью.
Рекомендую поиграться с этими примерами — они есть на нашем GitHub. Так вы точно поймете, как устроена асинхронность в Dart. Если что-то непонятно, задавайте вопросы — я отвечу в комментариях.