Параллельные вычисления в настоящее время являются одной из самых приоритетных задач в области информатики. Если посмотреть на список TOP 100 (самые быстродействующие компьютеры в мире), легко заметить, что вся верхушка списка занята системами с массовым параллелизмом, состоящим из нескольких сотен, а то и тысяч процессорных блоков. Основная проблема заключается в том, что собрать такую систему зачастую оказывается проще, чем использовать, т.к. реализовать алгоритм, который будет эффективно распараллеливать свою работу на сотнях имеющихся в наличии вычислительных блоках, очень непросто. А от не адаптированного алгоритма толку на подобном суперкомпьютере не будет – скорость его выполнения будет не намного выше, чем на обычном PC.
1. Зачем это нужно?
Параллельные вычисления в настоящее время являются одной из самых приоритетных задач в области информатики. Если посмотреть на список TOP 100 (самые быстродействующие компьютеры в мире), легко заметить, что вся верхушка списка занята системами с массовым параллелизмом, состоящим из нескольких сотен, а то и тысяч процессорных блоков. Основная проблема заключается в том, что собрать такую систему зачастую оказывается проще, чем использовать, т.к. реализовать алгоритм, который будет эффективно распараллеливать свою работу на сотнях имеющихся в наличии вычислительных блоках, очень непросто. А от не адаптированного алгоритма толку на подобном суперкомпьютере не будет – скорость его выполнения будет не намного выше, чем на обычном PC.
Но программирование суперкомпьютеров и создание эффективных параллельных алгоритмов – это отдельная и очень сложная тема. Ей посвящено множество научных исследований, имеется большое количество интересных статей и книг. Правда, следует признать, что эффективного и простого способа создания параллельных программ до сих пор не существует. В этой статье мы затронем другую тему – использование параллелизма в обычных приложениях.
Ещё несколько лет назад создатели прикладных программ вполне обходились без параллелизма на уровне приложения. Как устроена типичная простая программа под Windows – есть цикл обработки событий (event loop), внутри которого реализована вся логика взаимодействия программы с пользователем. Нажал пользователь клавишу Enter – программе пришло соответствующее событие и вызвалась функция-обработчик для этого события. Эта функция выполнила некоторые действия (например, сохранила введённые пользователем данные и вывела на экран новую форму) и вернула управление в основной цикл. После чего программа ждёт прихода следующего события. Всё просто, красиво, удобно для программирования и… неудобно для пользователя. Дело в том, что не все операции программа может выполнить мгновенно. Например, копирование файла на дискетку занимает достаточно много времени. Поскольку в это время программа не готова к приёму следующих событий, она просто не реагирует на действия пользователя. Максимум, что можно сделать – это сменить тип курсора на часики и вывести окошко с сообщением: "подождите минутку". А зачастую пользователю остаётся только гадать – "зависла" программа или всё-таки она что-то делает и стоит немного подождать (опытные пользователи способны определить это по «косвенным признакам» - загрузке CPU, активности диска…). А в чём заключается причина проблемы? В том, что программа не в состоянии выполнять параллельно две операции: копирование файла и обработку пользовательских событий.
Другой пример. Рассмотрим создание сёрверных приложений. Классический сценарий - с каждым клиентом сёрвер устанавливает соединение по сокету (socket); с помощью системного вызова select программа определяет, по какому из сокетов получены данные. Дальше - цикл, который вызывает подпрограмму-агента для обработки запроса от данного клиента. Структура программы получается достаточно простой, но проблемы возникают и здесь. Во-первых, проблема с эффективным использованием ресурсов сёрвера: пока выполняется один запрос, остальные ждут. Во-вторых, если выполнение запроса требует много времени, то время реакции сёрвера оказывается неприемлемым. Не удаётся равномерно загрузить все ресурсы сёрвера и за счёт этого повысить производительность. Если обработчик запроса, например, читает данные с диска, то CPU при этом обычно простаивает, а хотелось бы использовать его для обработки других запросов, не требующих доступа к диску.
Другая проблема связана с тем, что часто для клиента необходимо помнить контекст, т.е. обработка текущего запроса клиента зависит от результата предыдущих запросов. В принципе ничего страшного в этом нет: мы можем завести специальную структуру данных и запоминать состояние для каждого клиента. Получается этакий конечный автомат, рёбра которого соответствуют приходящим от клиента запросам. Конечный автомат – очень гибкий и мощный механизм, только вот программировать его – занятие не из приятных. Приходится постоянно помнить, кто у нас в каком состоянии и куда он потом может попасть. Опять таки сохранение и получение текущего состояния не способствует компактности и простоте кода программы. Было бы гораздо проще, если бы с каждым клиентом был связан отдельный поток, в котором хранятся все данные этого клиента и который может выполняться параллельно с потоками для других клиентов.
2. Параллелизм с точки зрения ОС
Параллельные программы появились достаточно давно. Даже такие примитивные системы как MS-DOS, были способны параллельно выполнять несколько программ. Конечно, если в машине всего один процессор, то полного параллелизма, конечно, достичь не удастся. Процессор в каждый момент времени может выполнять команды только одной программы. Тут уже задача операционной системы обеспечить правильное планирование выполнения различных программ, так чтобы и различные устройства (диск, процессор) использовались эффективно, и каждой программе «честно» бы выделялся свой квант времени.
Классический алгоритм планирования выполнения программ использует одну или несколько очередей с задачами, готовыми к выполнению. Каждой задаче назначается свой приоритет выполнения. Обычно интерактивным задачам (задачам, взаимодействующим с пользователями) следует назначать более высокий приоритет по сравнению с преимущественно вычислительными задачами. Также высокий приоритет нужен критичным по времени задачам, таким, как управление записью на компакт диск или проигрыватель музыки. Приоритет программы может меняться в процессе работы – давно находящейся в режиме ожидания задаче можно поднять приоритет, а активно выполняющейся – наоборот снизить. Это способствует более «честному» планированию задач и быстрой реакции на команды пользователя. Планировщик задач выбирает задачу из начала списка наиболее высокоприоритетных задач, выделяет ей квант времени для выполнения и передаёт ей управление. Далее возможно следующее:
Программа полностью отрабатывает свой квант времени. Тогда планировщик заносит её в конец списка готовых к выполнению задач и выбирает для выполнения следующую задачу с наибольшим приоритетом.
Программа инициирует операцию, которая не может быть немедленно выполнена (например, чтение с диска). В этом случае планировщик переносит задачу в список ждущих задач и выбирает следующую, готовую для выполнения. По окончании операции задача будет опять помещена в список готовых к выполнению.
Происходит системное прерывание или появляется задача с большим приоритетом, готовая к выполнению. Планировщик определяет, может ли программа продолжить работу или следует запустить более высокоприоритетный процесс.
Подобная схема ещё называется «преимущественной многозадачностью» (preemptive multitasking). При такой схеме никакая задача не может полностью блокировать выполнение задач с таким же или большим приоритетом. Некоторые старые ОС (например, Windows 95) имели более простую в реализации модель параллелизма (называемую «не выталкивающей многозадачностью»), когда перепланирование процессов происходит только при желании активного процесса. Т.е. задача должна сама информировать ОС, что её можно прервать. Естественно не правильно работающая программа при такой схеме может полностью блокировать работу системы, что и является следствием гораздо менее стабильной работы Win 95, по сравнению, например, с семейством Windows NT.
3. Поток – что это такое?
Итак, параллельные процессы используются уже достаточно давно. Почему же их нельзя использовать для решения наших задач? Ответ: можно, но неудобно. Дело в том, что процесс представляет собой достаточно замкнутую систему. У процесса есть своя память (команд и данных), свой стек, свои дескрипторы для доступа к файлам и другим устройствам и т.д. При этом средства взаимодействия процессов между собой достаточно ограничены предоставляемым ОС API. Обычно средства межпроцессорного взаимодействия включают в себя очереди для обмена сообщениями между процессами, разделяемую память (shared memory) и синхронизационные примитивы: семафоры, события, критические секции. Т.е. если вы хотите выполнить фоновое сохранение данных на диск, вам нужно будет создать новый процесс, использовать один из существующих механизмов межпроцессорного взаимодействия (IPC) для передачи этому процессу имени файла и данных, которые должны быть сохранены, после этого, опять-таки с использованием IPC, получить от процесса уведомление о завершении операции. В общем, простейшая операция выливается во множество строчек кода.
Проблема ещё состоит в том, что, так как с процессом связано достаточно много данных, то и порождение процесса и переключение контекста процессов представляет собой достаточно трудоёмкую операцию. Таким образом, использовать различные процессы для распараллеливания работы можно только если эти работы слабо связаны с друг другом. В этом случае процессы могут работать почти независимо друг от друга. Если же для выполнения необходим интенсивный обмен данными, то накладные расходы от межпроцессного взаимодействия и переключения контекстов часто оказываются настолько большими, что всякий выигрыш от параллельной работы процессов теряется.
Поэтому не удивительно, что почти во всех современных операционных системах поддерживаются облегчённые процессы или потоки (threads). На самом деле реализации потоков в разных ОС может существенно отличаться. Но мы не будем здесь заострять на этом внимание. Определим поток как самостоятельную активность внутри процесса. Поток имеет свой стек, но не имеет своей собственной памяти. Вместо этого все потоки в процессе разделяют общую память (т.е. один поток может получить доступ к данным другого потока). Это значительно упрощает передачу данных между потоками (механизм передачи данных ничем не отличается от передачи параметров в подпрограмму). Но при этом, в отличие от процесса, данные которого изолированы от других процессов, данные потока могут быть изменены любым другим потоком, что, конечно, требует повышенного внимания при программировании многопоточных приложений и аккуратного использования синхронизации (об этом позже).
За счёт того, что у потока гораздо меньше собственных ресурсов, чем у процесса, создание потока требует гораздо меньше времени, чем запуск процесса. Также переключение контекста между различными потоками в рамках одного процесса выполняется гораздо быстрее, чем переключение между различными процессами (все потоки внутри одного процесса имеют общую память, поэтому при переключении контекста не надо менять таблицу отображения страниц).
Планирование выполнения потоков система осуществляет точно так же, как планирование процессов. Точнее, во многих операционных системах, поддерживающих потоки, единицей планирования является именно поток, а не процесс. Важно отметить, что операционная система способна организовать параллельное выполнение потоков: либо распределением потоков по различным процессорным устройствам (на многопроцессорной машине), либо используя перепланировку по истечении отведённого потоку кванта времени. В обоих случаях надо быть готовым к тому, что выполнение потока может быть прервано в любой момент и управление передано другому потоку (это не совсем так – есть понятие атомарной, т.е. неделимой операции, но какая операция атомарная, а какая нет, зависит от архитектуры системы, используемого языка, типа данных и даже опций компилятора, по этому мы не будем здесь это обсуждать).
4. Потоки в Java – как всё просто!
Механизм потоков в Java использует усовершенствованную схему мониторов Хоара. Потоку соответствует класс java.lang.Thread. Для создания своего потока можно либо вывести свой класс из Thread, либо реализовать интерфейс Runnable. И в том, и в другом случае необходимо реализовать собственный метод run(), который, собственно, и будет выполнять нужное действие. Запустить поток на выполнение можно методом start(). Если необходимо дождаться завершения потока, надо использовать метод join(). Ниже приведены примеры для двух способов создания потока:
class MyThread extendsThread {
publicvoid run() {
System.out.println("Thread is running");
}
}
publicstaticvoid main(String args[]) {
MyThread thread = new MyThread();
thread.start();
try {
thread.join();
} catch (InterruptedException x) {}}
А что будет, если не дожидаться завершения потока? Программа будет активна, пока работает по крайней мере один поток. Т.е., другими словами, программа ждёт завершения всех запущенных потоков. Иногда это не нужно. В этом случае надо пометить поток как «демон» с помощью метода setDaemon(true). Завершения "демонов" программа не дожидается и просто их останавливает. Вызвать setDaemon метод нужно до запуска потока.
Ну, хорошо – потоки мы запускать научились. Всё – задача решена? Нет, на самом деле это только начало. Как мы уже говорили, все потоки разделяют общую память. Посмотрим, чем это чревато. Допустим, мы пишем банковскую систему для обслуживания банкоматов, и у нас есть метод, который проверяет, достаточно ли у клиента денег на счету и, если да, снимает запрошенную сумму со счёта и даёт банкомату "добро" на выдачу денег:
Так как банкоматов у нас много, мы решаем реализовать параллельную обработку запросов и для этого используем потоки. Вроде бы всё прекрасно работает (в этом и заключается одна из основных проблем параллельного программирования – ошибки очень трудно воспроизводимы). Теперь давайте внимательно проанализируем код. Итак, метод withdraw() может параллельно выполняться несколькими потоками. Как мы уже отмечали, поток может быть прерван в любой момент времени. Поэтому возможет такой сценарий:
Имеется два потока П1 и П2, в которых вызывается метод withdraw() Пусть на счету находится сумма в 100уе, П1 запрашивает 80уе, а П2 – 60уе.
Происходит перепланировка потоком, управление получает поток П2
Поток П2 проверяет условие (amount < balance). Оно тоже истинно (60 < 100).
Поток П2 уменьшает баланс на затребованную величину и возвращает true. Теперь баланс равен 40уе.
Управление возвращается потоку П1.
Он тоже уменьшает значение баланса и возвращает true. Баланс равен… -40уе!
Н-да, так с деньгами обращаться нельзя. Параллелизм - это конечно здорово, но целостность данных от этого страдать не должна. Кусок кода программы, в котором недопустимо влияние других потоков, называется критической секцией. В критическую секцию дозволено входить только одному потоку. Остальные будут ждать, пока этот поток не покинет критическую секцию. В языке Java для оформления критической секции предусмотрено ключевое слово synchronized(синхронизированный). Можно объявить синхронизированными как весь метод, так и отдельный блок операторов:
В данном случае оба способа эквивалентны. Первый представляется более удобным, а второй обеспечивает большую гибкость.
Что означает аргумент this в конструкции synchronized(this)? В Java с каждым объектом неявно связан монитор – т.е. нечто, отвечающее за синхронизацию доступа к данному объекту. Вся синхронизация в Java осуществляется на уровне объектов. Т.е. внутри synchronized блока блокируется данный экземпляр объекта. Метод, помеченный как synchronized, осуществляет блокировку this объекта для методов экземпляра класса или блокировку самого класса для статических методов. Т.е объявляя метод как synchronized, мы говорим системе, что этот метод требует эксклюзивного доступа к объекту. И система гарантирует, что из всех синхронизированных участков кода, для каждого конкретного экземпляра объекта в каждый момент времени может выполняться только один такой участок. При этом:
Синхронизированный метод может выполняться параллельно для разных объектов, например – метод withdraw может параллельно производить операции с различными счетами.
Для одного экземпляра запрещено параллельное выполнение не только одного и того же синхронизированного метода, но и любых других методов и синхронизированных блоков, использующих данный экземпляр объекта. Например, если бы у нас в классе Account был синхронизированный метод deposit (положить деньги на счёт), то и он бы не мог выполняться параллельно с методом withdraw для данного счёта.
Конструкция synchronized не препятствует параллельному выполнению методов, не отмеченных как синхронизированные. Т.е., если бы в нашем классе Account был метод deposit(), который мы забыли бы пометить как synchronized, то он вполне мог выполняться параллельно с методом withdraw(). Причём, последствия могли бы быть не менее печальными, чем при выполнении двух параллельных withdraw().
Итак, мы научились предотвращать нежелательное влияние потоков друг на друга. Но этого ещё недостаточно. Часто хочется уметь оповещать другой поток, например, о готовности данных для него. И, соответственно, ждать прихода такого сообщения. Для этого в Java в классе Object предусмотрены методы wait, notify, notifyAll. Для использования этих методов поток должен быть эксклюзивным владельцем объекта, т.е. использовать эти методы для this объекта внутри метода, объявленного как synchronized, или внутри synchronized блока с этим объектом в качестве параметра. Эти методы работают следующим образом:
При выполнении метода wait система снимает блокировку с объекта и переводит поток в режим ожидания.
При выполнении метода notify система переводит в точности один поток, ожидающий наступления данного события, в режим готовности. Если такого потока нет, то выполнение этого метода не имеет никакого эффекта. Следует заметить, что если наступления события ожидают несколько потоков, то будет выбран любой из них. При этом, для возврата из метода wait, потоку придётся конкурировать за доступ к данному объекту с другими потоками. Только установив вновь эксклюзивную блокировку объекта, ожидающий поток может вновь продолжить выполнение.
При выполнении метода notifyAll пробуждаются все потоки, ждущие наступления данного события.
Кажется не совсем понятным, зачем требуется блокировать объект перед вызовом wait, чтобы потом wait снял эту блокировку и пытался установить её вновь после получения уведомления. Но это сделано для того, чтобы проверка условия и переход в режим ожидания выполнялись атомарно (не делимым образом). Обычно поток, ожидающий наступления какого-то события, проверяет переменную, связанную с этим событием. Если результат проверки отрицательный, то поток переходит в режим ожидания. Например, простейший класс "событие" (event), можно реализовать на Java следующим образом:
class AutoResetEvent { // событие с автосбросом
publicsynchronizedvoid waitEvent() {
try {
while (!signaled) { // ждать наступления события
wait();
}
signaled = false;
} catch (InterruptedException x) {}
}
publicsynchronizedvoid signalEvent() {
signaled = true;
notify(); // пробудить спящий поток
}
privateboolean signaled;
}
Обратите внимание, что проверка условия signaled делается в цикле. Это необходимо, т.к., после того как signaled был установлен в true методом signal и ждущий поток помещён в очередь готовых к выполнению процессов, поток, вызвавший wait, ещё не стал владельцем блокировки объекта. Поэтому, если в данный момент другой поток вызовет метод waitEvent, вполне может быть, что именно он завладеет блокировкой и продолжит выполнение. Так как переменная signaled имеет значение true, то этот поток не будет ждать, а сбросит signaled в false и продолжит выполнение. А поток, выполнивший wait и завладевший наконец блокировкой, обнаружит, что signaled сброшен. Поэтому он вынужден будет выполнить ещё одну итерацию цикла и опять ждать наступления следующего события.
Следует также заметить, что метод wait снимает блокировку только с одного объекта (а именно с того, для которого был позван метод wait). Если вы заблокировали какие-либо еще объекты, то они так и останутся заблокированными на время, пока поток находится в состоянии ожидания. Возможно, вы того и хотели, но чаще всего это свидетельствует об ошибке в программе. Для повышения степени параллелизма (и, соответственно, производительности) и избежания тупиковых ситуаций следует устанавливать блокировки на как можно более короткий срок. Следующий пример иллюстрирует проблему:
В этом примере метод wait снимает блокировку с объекта event, но собственный объект (this) остаётся заблокированным.
А вот пример реализации на Java события с ручным (явным) сбросом:
class ManualResetEvent { // событие с автосбросом
publicsynchronizedvoid waitEvent() {
try {
while (!signaled) { // ждать наступления события
wait();
}
} catch (InterruptedException x) {}
}
publicsynchronizedvoid resetEvent() {
signaled = false;
}
publicsynchronizedvoid signalEvent() {
signaled = true;
notifyAll(); // пробудить все спящие потоки
}
privateboolean signaled;
}
И еще один пример - реализация семафора на Java. Семафор - это классический синхронизационный примитив, предложенный Э.В. Дейкстрой, который можно использовать для распределения некоторого ограниченного количества ресурсов. У классического семафора есть две операции P(занять ресурс) и V(освободить ресурс). Операция P проверяет счётчик доступных ресурсов и, если он больше нуля, то вычитает единицу и возвращает управление. В противном случае поток, выполняющий операцию P, блокируется до тех пор, пока счётчик не станет больше нуля. А операция V используется для оповещения о том, что ресурс потоку больше не нужен. При этом счётчик доступных ресурсов увеличивается на единицу.
Реализация потоков в Java зависит от используемой виртуальной машины. В некоторых случаях используется реализация потоков на библиотечном уровне. То есть всё управление потоками осуществляется внутри самой виртуальной машины без участия операционной системы. Такие потоки имеют наименьшие накладные расходы, так как для переключения контекста не требуется системных вызовов. Но такая реализация не позволяет использовать все имеющиеся ресурсы на многопроцессорной машине и параллельно выполнять несколько потоков. Кроме того, в большинстве случаев такая реализация использует не вытесняющую стратегию для перепланировки потоков. То есть, для того, чтобы управление было передано другому потоку, активный в данный момент поток должен сам захотеть этого (например, вызов блокирующего системного метода приводит к тому, что поток переводится в очередь ждущих потоков, а из очереди готовых к выполнению потоков выбирается новый поток). Поэтому большинство текущих реализаций виртуальных машин использует системные потоки (например, Win32 или pthread) для реализации над ними потоков Java.
Вот, собственно, и всё - мы ознакомились с реализацией параллелизма в Java! Простота конструкций, используемых в Java для управления потоками, создаёт иллюзию, что написание многопоточных приложений ничуть не сложнее написания обычных приложений: расставил всюду synchronized, вставил где необходимо wait/notify, создал нужное количество потоков и всё - многопоточная программа готова. По сравнения с библиотеками, используемыми, например, в С++ для создания и управления потоками, в Java всё кажется простым и понятным. К сожалению, эта простота кажущаяся. Проблемы, возникающие при написании параллельных программ, никуда не исчезли и ждут своего часа. Но об этом в следующей главе.
5. Почему так сложно писать и, главное, отлаживать параллельные программы?
Вернёмся немного назад. Как приходилось отлаживать программы на С/C++? "Гуляющие указатели", не инициализированные переменные, содержащие мусор, висящие ссылки и утечки памяти, не отлавливаемые выходы индекса за границу массива… Программа "грохающаяся" в непредсказуемом месте, потому что совсем в другом участке программы кто-то неправильно обошёлся с указателем. Программа то работающая, то не работающая (в зависимости от того, какой мусор встретился в памяти. Не даром для С++ создано столько всяких СodeGuardов и BoundsCheckerов. После этого ошибки в программах на языке Java кажутся чуть ли не самоустраняющимися - все переменные инициализированы, выход индекса за границу массива отслеживается и пресекается мгновенно, гуляющих и висячих указателей нет в принципе, как и утечек памяти. Программа ведёт себе абсолютно детерминированным образом - если она "сваливалась" на каком-то определённом наборе входных данных, то можно быть уверенным, что, будучи запущенной ещё раз с тем же набором данных, мы получим тот же самый результат. В общем, полное торжество защищённых программных систем и языков.
Но вот мы написали многопоточную программу. И что мы видим? Опять программа ведёт себя абсолютно непредсказуемым образом. Будучи отлажена, оттестирована и запущена 1000 раз, на тысячу первый она ни с того ни с сего виснет. Или ещё хуже - вдруг оказываются "испорченными" данные. Попытка воспроизвести ошибку ни к чему не приводит - программа отлично работает...до следующего падения. Случаются и не столь катастрофические, но не менее загадочные явления - программа, прекрасно работающая на однопроцессорной машине, на вдвое более мощной двухпроцессорной машине вдруг начинает работать на порядок (десятичный, а не двоичный) медленнее, чем на однопроцессорной. В общем, мы опять оказываемся в первобытном дремучем лесу отладки недетерминировано работающей программы. С той лишь разницей, что продуктов типа BoundsCheckerа для поиска ошибок в синхронизации очень мало, да и качество их работы оставляет желать лучшего.
На самом деле, все ошибки, приводящие к неправильной работе многопоточного приложения можно разбить на два класса: конкурентный доступ (английский термин "race condition" не поддаётся разумному переводу) и взаимные блокировки. Ошибки первого рода являются следствием недостатка синхронизации, второго - её избытка (или неправильного применения).
С ошибками первого рода мы уже сталкивались - пример с банковским счётом. Если два или больше потоков начинают одновременно изменять (или даже один изменяет, а другой только смотрит) одни и те же данные, то обычно ничего хорошего из этого не получается. Что же делать? Понаставить всюду synchronized? Во-первых, тогда весь выигрыш от многопоточности может испариться - если все объекты доступны только в эксклюзивном режиме, то работать в каждый момент времени сможет только один поток. А во вторых, чрезмерное и необдуманное использование блокировок приводит к проблемам второго рода - тупиковым ситуациям. Рассмотрим следующий пример:
class Pipe { // канал
Queue src; // источник
Queue dst; // приёмник
void forward() { // переметить элемент из источника в приёмник
synchronized(src) { // блокируем приёмник
synchronized(dst) { // блокируем приёмник
Object elem = src.dequeue(); // взять элемент из очереди src
dst.enqueue(elem); // и поместить в очередь dst
}
}
}
void backward() { // переместить элемент в обратном направлении –
// из приёмника в источник
synchronized(dst) { // блокируем приёмник
synchronized(src) { // блокируем приёмник
Object elem = dst.dequeue(); // взять элемент из очереди dst
src.enqueue(elem); // и поместить в очередь src
}
}
}
}
Допустим, что с данным классом параллельно работают несколько потоков. При этом возможен такой сценарий:
Поток П1 вызывает метод forward.
В потоке П1 выполняется первый synchronized оператор и устанавливается блокировка src.
В результате перепланировки потоков управление получает поток П2.
Поток П2 вызывает метод backward.
В результате выполнения метода backward в потоке П2 происходит блокирование объекта dst.
Попытка блокировать объект src не удаётся, так как этот объект уже блокирован П1. Поток П2 переходит в режим ожидания.
Управление вновь получает П1 и пытается блокировать dst. И эта попытка оканчивается неудачей, так как dst уже блокирован П2. Потоку П1 ничего не остаётся, как ждать...потока П2, который в свою очередь ждёт П1. Выйти из этой ситуации потоки не смогут. Поэтому она и зовётся тупиком.
В данном случае проблема решается достаточно просто: нужно, чтобы метод backward блокировал объекты в том же порядке, что и метод forward. Тупиковая ситуация при этом возникнуть не может. Этот простой прием, кстати, является одним из самых основных способов предотвращения тупиковых ситуаций - старайтесь всегда блокировать объекты в одном и том же порядке. Можно также ассоциировать с объектами-ресурсами приоритеты, и блокировать объекты в порядке убывания приоритета (иерархические блокировки). К сожалению, тупиковую ситуацию можно получить, даже если у вас есть один единственный метод с synchronized атрибутом. Рассмотрим следующий пример:
class SomeClass {
voidsynchronized foo(SomeClass a) {
...
a.foo();
...
}
}
Теперь допустим, что у нас есть объекты класса SomeClass о1 и о2 и два потока П1 и П2, параллельно выполняющие следующие вызовы:
П1: o1.foo(o2);
П2: o2.foo(o1);
C большой вероятностью этот код приведёт к взаимной блокировке: П1 блокирует о1, П2 блокирует о2 и оба будут ждать друг друга).
Что же делать? Как научиться определять возможные блокировки и пытаться избегать их? К сожалению, Java компилятор нам тут ничем не поможет. Придётся брать в руки карандаш и бумагу (кто там говорил о скорой кончине последней?) и начинать рисовать граф блокировок. Вершинам в этом графе соответствуют объекты, которые могут быть заблокированы (объект синхронизации в synchronized блоке или экземпляр класса для synchronized метода). Конечно, этих объектов может быть неопределённо много (например, в случае synchronized метода, каждый экземпляр объекта данного класса может быть заблокирован). Для начала, представим все экземпляры класса одной вершиной. Итак, вершины мы нарисовали. Теперь внимательно изучаем код и ищем зависимости между объектами. Для этого строим граф вызовов, т.е. определяем, какие другие методы может позвать данный метод. Нужно построить замыкание этого отношения (т.е. иными словами учесть не только те методы, которые могут быть непосредственно позваны, но и методы, которые позовут эти методы, и.т.д.) Не забудьте учесть наследование - в общем случае вызов метода базового класса или интерфейса может привести к вызову метода любого выведенного класса, переопределяющего или реализующего данный метод. Граф вызовов для большой программы может получиться просто непотребно большим, поэтому лучше его не рисовать явно, а строить в уме. Но тут главное никого не забыть. После того, как мы выяснили, кто кого может позвать, проводим рёбра в нашем графе блокировок. Если внутри участка кода, блокировавшего объект "x", блокируется объект "y" или вызывается метод, который в свою очередь блокирует объект "y", то от вершины, помеченной "x", мы проводим ребро в вершину помеченную "y". Провели? Ну, значит, дело почти сделано. Теперь изучаем наш граф и ищем в нём циклы. Не нашли - поздравляю: возникновение тупиковой ситуации в вашей программе невозможно (если вы, конечно, не напутали что-то с графом). Нашли? - ну тут возможны варианты. Помните, что мы все экземпляры данного класса пометили одной единственной вершиной? Может так оказаться, что цикл, существующей в нашем графе на самом деле невозможен, потому что блокируются различные экземпляры объектов. Тут уже нужно более тонкое изучение, использующее семантику конкретной программы.
Возникает вопрос: нельзя ли автоматизировать описанную выше процедуру, чтобы не рисовать сами граф карандашом, а предоставить построение графа и поиск циклов в нём программе. Ответ - да, можно. Хотя подобных продуктов существует не так уж много. Один из них Jlint написан мной и его можно свободно скачать с моего сайта http://www.garret.ru/~knizhnik.
К сожалению, моя текущая работа в фирме Борланд тоже связана с верификацией кода, поэтому по соображениям корпоративной этики я прекратил дальнейшую разработку и совершенствование данного продукта. Ещё необходимо заметить, что реализовать описанный выше алгоритм не так просто, как кажется на первый взгляд. Для этого надо построить не просто граф вызовов, а контекстно-зависимый граф вызовов. А это в большом проекте может занять слишком много памяти и времени. Кроме того, из-за представления в графе всех экземпляров объектов одной вершиной, достаточно велик процент "белого шума" - т.е. ложных сообщений о блокировках, что делает даже анализ найденных циклов для большого проекта нетривиальной задачей.
К счастью, в отличие от конкурентного доступа, тупиковая ситуация (будучи воспроизведённой под отладчиком), хорошо поддаётся анализу. Просто смотрим список "застрявших" потоков, изучаем стек вызова каждого такого потока и определяем, какие объекты данный поток блокировал и какие пытается заблокировать. После чего надо только понять, как разрушить цикл.
К сожалению, для поиска ошибок связанных с конкурентным доступом сложно дать какие-то рекомендации. Очевидно, что каждые разделяемые и изменяемые данные должны быть защищены от конкурентного доступа. Но как именно это сделать - решать программисту. Как обнаружить места, где это забыли сделать или сделали неправильно? Могу предложить только очень общий способ. Очевидно, что синхронизация имеет смысл только в том случае, если все потоки используют для синхронизации одни и те же объекты. Если поток П1 для обращения к объекту o блокировал объект m1, а поток П2 для обращения к тому же объекту о использует блокировку другого объекта m2, то ничего хорошего от такой "синхронизации" не получится. Поэтому можете попробовать для каждого совместно используемого объекта или переменной определить объект, который отвечает за синхронизацию доступа к нему. Если у вас получилось больше одного синхронизирующего объекта для какого-то совместно используемого элемента - то, скорее всего, тут ошибка. И ещё одна рекомендация по воспроизведению ошибок. Хотя, как мы уже говорили, даже на однопроцессорной машине поток может быть прерван в любой момент и поэтому возможно параллельное выполнение почти любых операторов (если только они не находятся в критической секции), на многопроцессорной машине вероятность этого значительно больше. Кроме того, тут так же сказывается параллелизм на уровне инструкций - инструкции, который могли рассматриваться как атомарные в однопроцессорной конфигурации, перестают быть такими в многопроцессорной системе. Поэтому постарайтесь прогнать свою программу на многопроцессорной системе - возможно гораздо быстрее наткнётесь на скрытые ошибки. Кроме того, при прогоне на многопроцессорной машине могут проявиться проблемы с падением производительности, вызванные конфликтами блокировок разных потоков. В этом случае придётся либо менять схему блокировок, либо изменять распределение данных между потоками, чтобы свести к минимуму использование глобальных (разделяемых) данных, доступ к которым требует синхронизации.
6. Проблема остановки.
Как мы уже видели, запуск потока в Java не представляет собой проблемы – достаточно создать объект Thread и позвать метод start. Однако, оказывается, что, запустив поток на выполнение, его не так-то просто остановить. Хотя метод stop в классе Thread есть, но он помечен как deprecated (устаревший). Т.е. пользоваться им настоятельно не рекомендуется. И на то есть веские причины. Дело в том, что поток, остановленный в произвольный момент времени, может оставить в некорректном состоянии системные ресурсы. Например, если поток был владельцем каких либо блокировок, то эти объекты так и окажутся не разблокированными. Что же делать, если «убивать» поток нельзя? Надо заставить его совершить «харакири» - т.е. завершиться добровольно. Для этого в потоках, которые выполняют некоторое циклическое действие, в условие цикла надо поставить проверку того, что потоку не пора завершать работу. Например:
class MyThread extendsThread {
boolean running = true;
publicvoid run() {
while (running) {
... // делаем что-то
}
}
}
Однако обычно поток не только производит вычисления, но и совершает операции ввода/вывода. Соответственно очень часто поток блокирован в результате выполнения системного вызова, который не может быть выполнен мгновенно – например, read. Чтобы заставить такой поток закончить выполнение, надо воспользоваться методом interrupt() класса Thread. Если поток был блокирован во время выполнения метода Object.wait(), то в потоке будет инициирована исключительная ситуация InterruptedException. Если поток выполнял блокирующую операцию ввода/вывода, то выполнение операции будет прервано c помощью исключительной ситуации java.nio.channels.ClosedByInterruptException. При этом всё равно полезно использовать флаг остановки, чтобы отличать ситуацию, когда операция прервана в результате ошибки ввода/вывода и когда причина – остановка потока.
Иногда в потоке стоит задержка выполнения на некоторое время. В классе Thread имеется метод Sleep, который "усыпляет" поток на заданное время в миллисекундах. Но если поток нужно будет останавливать, то более правильным представляется использование метода Object.wait с задержкой. В этом случае объекту можно послать уведомление, чтобы прервать его сон:
class MyThread extendsThread {
boolean running = true;
Object timer = newObject();
staticfinalint DELAY=1000; // одна секунда
publicvoid run() {
while (running) {
... // делаем что-то
synchronized(timer) {
try {
timer.wait(DELAY); // ждём заданное время,
// либо сигнала о завершении
} catch (InterruptedException x) {}
}
}
}
publicvoid terminate() {
synchronized(timer) {
running = false; // устанавливаем флаг завершения
timer.notify(); // и посылаем сигнал, чтобы пробудить спящий процесс.
}
}
}
Хотя поток и является более легковесным ресурсом, чем процесс (т.е. создание и переключение потоков требует меньше времени и памяти), но всё равно создание потока достаточно сложная операция, особенно в операционных системах, поддерживающих потоки на уровне ядра (а только в этом случае можно получить выигрыш от использования параллельных потоков на многопроцессорной машине). Поэтому, если программе необходимо периодически выполнять какие-то параллельные действия, неэффективно каждый раз создавать новый поток. Вместо этого лучше завести пул потоков. Идея очень проста: вместо того, чтобы каждый раз создавать новый поток, а потом его завершать, мы заводим некоторое число готовых потоков, которые ждут момента, когда они понадобятся. После выполнения работы поток не завершается, а опять попадает в список свободных потоков и ждёт следующего задания. Посмотрим, как это можно реализовать:
/**
* Пул потоков
*/
publicclass ThreadPool {
/**
* Получить экземпляр пула потоков
*/
publicstatic ThreadPool getInstance() {
return theInstance;
}
/**
* Поток для повторного использования
*/
staticclass PooledThread extendsThread {
PooledThread next;
Object ready; // объект, используемый для ожидания нового задания
Object done; // объект, используемый для сигнализации завершения работы
boolean busy; // флажок отмечающий, что поток занят
boolean doneNotificationNeeded; // ждёт ли кто-нибудь завершения работы
Runnable task; // что нужно сделать
ThreadPool pool; // пул потоков
publicvoid run() {
try {
synchronized(ready) { // блокируем ready – готовимся к ожиданию работы
while (!pool.closed) { // проверяем не закрыт ли пул
synchronized(done) { // блокировка ready
busy = false; // мы закончили работу начатую на предыдущей
// итерации цикла
if (doneNotificationNeeded) { // если кто-то ждёт завершения работы…
done.notify(); // то пошлём уведомление, что мы её закончили
}
}
ready.wait(); // ждём нового задания
if (task != null) { // если мы его получили…
task.run(); // то выполняем
} else {
break; // иначе завершаем работу
}
}
}
} catch (InterruptedException x) {}
}
finalvoid wakeUp(Runnable t) { // запустить задание на выполнение
synchronized(ready) { // блокировать readу, чтобы можно было послать уведомление
busy = true; // установить флаг занятости
doneNotificationNeeded = false;
task = t;
ready.notify(); // послать уведомление свободному потоку
}
}
finalvoid waitCompletion() throwsInterruptedException {
synchronized(done) { // блокировать done для ожидания
if (busy) { // если задание ещё не завершилось…
doneNotificationNeeded = true; // то поставить флаг ожидания завершения
done.wait(); // и подождать
}
}
}
PooledThread(ThreadPool pool) {
this.pool = pool;
ready = newObject();
done = newObject();
busy = true;
setDaemon(true); // запускаем поток как демона, чтобы не ждать его завершения
// при выходе из программы
this.start();
}
}
/**
* Найти свободный поток и запустить в нём задание на выполнение
* @param task объект реализующий интерфейс Runnable,
* метод run которого будет выполнен потоком
* @return поток, выделенный для данного задания
* (его можно использовать только в методе ThreadPool.join)
*/
publicThread start(Runnable task)
{
PooledThread thread;
synchronized (this) { // для работы со списком нужна блокировка
while (availableThreadList == null) { // если список свободных потоков пуст
try {
if (nActiveThreads == maxThreads) { // и если достигнуто ограничение на
// максимальное число потоков
deficit += 1; // то отметить, что есть задачи, ждущие выполнения
wait(); // и подождать пока какой-нибудь из потоков не освободится
} else {
availableThreadList = new PooledThread(this); // создать новый поток
availableThreadList.waitCompletion(); // и дождаться момента,
// когда он стартует и будет
// готов к получению задания
}
} catch (InterruptedException x) {
returnnull;
}
}
thread = availableThreadList; // взять поток из списка свободных
availableThreadList = thread.next;
nActiveThreads += 1; // увеличит счётчик активных потоков
}
thread.wakeUp(task); // запустить задание на выполнение
return thread;
}
/**
* Дождаться завершения задания. Поток при этом возвращается в список свободных
* @param thread поток, возвращённый методом ThreadPool.start
*/
publicvoid join(Thread thread) throwsInterruptedException
{
PooledThread t = (PooledThread)thread;
t.waitCompletion(); // дождаться завершения задания
synchronized (this) { // для работы со списком нужна блокировка
t.next = availableThreadList; // добавить поток в список свободных
availableThreadList = t;
nActiveThreads -= 1; // уменьшить значение счётчика активных потоков
if (deficit > 0) { // если есть потоки ждущие уведомления…
notify(); // то послать уведомление о том, что поток освободился
deficit -= 1;
}
}
}
/**
* Закрытие пула потоков. Подождать окончания работы всех активных потоков и
* завершить все свободные потоки
*/
publicsynchronizedvoid close()
{
closed = true; // ставим флаг прекращения работы
while (nActiveThreads > 0) { // пока есть активные потоки
try {
deficit += 1; // мы нуждаемся в уведомлении о завершении потока
wait(); // ждём уведомления
} catch (InterruptedException x) {}
}
while (availableThreadList != null) { // пока список свободных потоков не пуст
availableThreadList.wakeUp(null); // потоку запрос на завершение
availableThreadList = availableThreadList.next; // и исключаем его из списка
}
}
/**
* Конструктор пула потоков с ограничением на максимальное число активных потоков
* @param maxThreads максимальное число одновременно работающих потоков.
*/
public ThreadPool(int maxThreads) {
this.maxThreads = maxThreads;
}
/**
* Конструктор пула потоков с неограниченным числом потоков
*/
public ThreadPool() {
this(Integer.MAX_VALUE);
}
PooledThread availableThreadList; // список свободных потоков
int nActiveThreads; // число активных потоков
int deficit; // количество потоков, заинтересованных в получении
// уведомления о завершении работы
int maxThreads; // ограничение на максимальное число одновременно
// работающих потоков
boolean closed; // флаг прекращения работы
static ThreadPool theInstance = new ThreadPool();
}
Обратите внимание в этом примере на то, что уведомления посылаются не всегда, а только тогда, когда их кто-то ждёт. Подобная оптимизация с использованием дополнительной переменной позволяет избежать лишних вызовов notify.
7. Практические примеры использования потоков
Рассмотрим использование потоков в реальных и достаточно простых приложениях. Я буду рассматривать примеры приложений под J2ME (Java для мобильных и встроенных устройств) по причине исключительной простоты MIDP интерфейса. Но способы работы с потоками в J2ME ничем не отличаются от других платформ Java.
Итак, первый пример. Допустим, мы реализуем научный калькулятор, который должен уметь рисовать графики функций. Для этого мы должны вычислить значение функции во всех точках указанного интервала с заданным шагом. Их может быть довольно много, а вычисление функции (особенно на слабеньком процессоре мобильного телефона), может занимать много времени. С другой стороны, пока мы не вычислили значение функции для всех точек, мы не сможем определить максимум и минимум функции на этом интервале и вычислить коэффициент для вертикального масштабирования графика функции (чтобы график поместился на экран). Конечно, можно просто вывести на экран "Подождите минуточку..." и заняться вычислениями. Но это не очень хорошее решение:
Во-первых пользователь не может прервать процесс вычислений (допустим вам в это время звонят, а у вас телефон занят вычислениями).
Во-вторых пользователь не знает сколько ещё времени ему осталось ждать (если бы он знал, что график будут построен только через час, он бы давно остановил работу и задал другой интервал или шаг).
Посмотрим, как это можно сделать. Ясно, что вычисление функции должно производится независимо от работы остальной программы. То есть нам нужен поток:
import javax.microedition.lcdui.*;
import javax.microedition.midlet.*;
publicclass Plot extends Form implements CommandListener, Runnable
{
Calculator calculator;
Gauge progressIndicator; // "градусник"
StringItem completed; // а сюда мы будем записывать строчку с процентом выполнения
Float from; // начальная точка
Float till; // конечная точка
Float dx; // шаг
Float[] values; // сюда мы будем заносить вычисленные значения функции
int percent; // процент выполнения
Thread thread; // поток в котором будет всё делаться
boolean running; // флажок нужный для "аварийной" остановки потока
Compiler.FunctionExpression f; // вычисляемая функция
Plot(Calculator calculator, Compiler.PlotExpression expr, int screenWidth) {
super("Plot");
this.calculator = calculator;
from = expr.from.evaluate(calculator.bindings);
till = expr.till.evaluate(calculator.bindings);
dx = expr.step != null
? expr.step.evaluate(calculator.bindings)
: till.Sub(from).Div(screenWidth);
f = expr.f;
int n = (int)till.Sub(from).Div(dx).toLong() + 1;
if (n < 2) {
calculator.showAlert(AlertType.ERROR, "Error", "Bad interval");
return;
}
values = newFloat[n];
i = 0;
x = from;
progressIndicator = new Gauge("Progress indicator", false, n, 0);
completed = new StringItem("Completed:", "0%");
append(progressIndicator);
append(completed);
setCommandListener(this);
addCommand(Calculator.STOP_CMD);
Display.getDisplay(calculator).setCurrent(this);
running = true;
thread = newThread(this);
thread.start(); // запускаем
}
publicvoid run() {
try {
Float x = from;
// проверяем флажок running, чтобы остановить работу потока при выполнении команды STOP
for (int i = 0, n = values.length; running && i < n; i++) {
f.arguments[0] = x;
// вычисляем значение функции в очередной точке
values[i] = f.evaluate(calculator.bindings);
int newPercent = i*100/n;
// оптимизация: обновляем индикаторы прогресса только при изменении процента выполнения
if (newPercent != percent) {
progressIndicator.setValue(i);
percent = newPercent;
completed.setText(Integer.toString(newPercent) + '%');
}
x = x.Add(dx);
}
} catch (CompileError x) {
calculator.showAlert(AlertType.ERROR, "Error", x.getMessage());
return;
}
if (running) {
// рисуем график, только если не была выполнена команда Stop
new Graph(calculator, from, till, values);
}
}
publicvoid commandAction(Command c, Displayable d) {
if (c == Calculator.STOP_CMD) {
// устанавливаем флажок для завершения потока
running = false;
Display.getDisplay(calculator).setCurrent(calculator.mainMenu);
}
}
}
Теперь рассмотрим пример написания простейшей игрушки. Допустим, у нас должен кто-то (для определённости назовём его НЛО) перемещаться по экрану с заданной скоростью. Рисование в MIDP выполняется в переопределённом методе paint(Graphics g) класса Canvas. Чтобы инициировать его выполнения нужно с помощью метода repaint попросить систему перерисовать какую-то область экрана (или весь экран). Т.е. если у нас НЛО ползёт по экрану, нужно попросить перерисовать прямоугольник, в котором находится НЛО. Дальше мы стираем старый образ НЛО, вычисляем его новые координаты и рисуем его на новом месте. Осталась самая малость - кто-то должен периодически (с периодом определяемым скоростью движения НЛО) вызывать метод repaint. Можно конечно для этого завести поток, но к счастью есть и более простое решение - использовать класс Timer:
import javax.microedition.lcdui.*;
import java.util.Timer;
import java.util.TimerTask;
publicclass GameCanvas extendsCanvasimplements CommandListener {
class GameTask extends TimerTask {
publicvoid run() {
repaint(); // запрос на перерисовку
}
}
GameCanvas(Game game) {
// вычисляем интервал перерисовки (скорость движения НЛО) в зависимости от уровня
delay = MAX_DELAY - game.settings.level*MAX_DELAY/Settings.LEVELS;
// создаём таймер
timer = newTimer();
// и объект котрый будет запускаться таймером
task = new GameTask();
...
}
// запустить игру
void start() {
// запускаем таймер (с нулевой задержкой и заданным интервалом в миллисекундах)
timer.schedule(task, 0, delay);
}
// приостановить игру
void stop() {
// остановить таймер
timer.cancel();
}
...
staticfinallong MAX_DELAY = 1000;
GameTask task;
Timer timer;
long delay;
}
Следует заметить, что внутри таймер реализован с помощью потока, который осуществляет задержку с помощью метода wait(long delay).
Заключение
Возможно, после прочтения главы 5 у вас возникло чувство, что многопоточность это очень сложная и опасная штука, поэтому её лучше не использовать. Это не так. Создание параллельных программ действительно сложная задача, но, похоже, без этого сейчас просто не возможно обойтись. И не всё так плохо. Серьёзные проблемы с управлением потоками возникают в основном в сложных приложениях (обычно сёрверных), где для достижения максимальной производительности и маштабируемости приходится бороться за максимальное распараллеливание этапов выполнения запросов. Если вам в вашем приложении просто нужно выполнить некоторое действие в фоновом режиме, то, возможно, вы и не встретитесь ни с одной из описанных мной проблем. Но будьте бдительны: ошибки в синхронизации – самые коварные.