Рейтинговые книги
Читем онлайн Операционная система UNIX - Андрей Робачевский

Шрифт:

-
+

Интервал:

-
+

Закладка:

Сделать
1 ... 88 89 90 91 92 93 94 95 96 ... 130

Передача данных внутри потока осуществляется асинхронно и не может блокировать процесс. Блокирование процесса возможно только при передаче данных между процессом и головным модулем. Таким образом, функции обработки данных потока — xxput() и xxservice() не могут блокироваться. Если процедура xxput() не может передать данные следующему модулю, она помещает сообщение в собственную очередь, откуда оно может быть передано позже процедурой xxservice(). Если и процедура xxservice() не может осуществить передачу сообщения, например, из-за переполнения очереди следующего модуля, она не будет ожидать изменения ситуации, а вернет сообщение обратно в собственную очередь и завершит выполнение. Попытка передачи повторится, когда ядро через некоторое время опять запустит xxservice().

Процедура xxservice() вызывается в системном контексте, а не в контексте процесса, который инициировал передачу данных. Таким образом, блокирование процедуры xxservice() может заблокировать (перевести в состояние сна) независимый процесс, что может привести к непредсказуемым результатам и потому недопустимо. Решение этой проблемы заключается в запрещении процедурам xxput() и xxservice() блокирования своего выполнения.

Блокирование недопустимо и для драйвера. Обычно прием данных драйвером осуществляется с использованием прерываний. Таким образом процедура xxput() вызывается в контексте прерывания и не может блокировать свое выполнение.

Когда процедура xxput() не может передать сообщение следующему модулю, она вызывает функцию putq(9F), имеющую следующий вид:

#include <sys/stream.h>

int putq(queue_t *q, mblk_t *mp);

Функция putq(9F) помещает сообщение mp в очередь q, где сообщение ожидает последующей передачи, и заносит очередь в список очередей, нуждающихся в обработке. Для таких очередей ядро автоматически вызывает процедуру xxservice(). Планирование вызова процедур xxservice() производится функцией ядра runqueues().[59] Функция runqueues() вызывается ядром в двух случаях:

□ Когда какой-либо процесс выполняет операцию ввода/вывода над потоком.

□ Непосредственно перед переходом какого-либо процесса из режима ядра в режим задачи.

Заметим, что планирование обслуживания очередей не связано с конкретным процессом и производится для всей подсистемы STREAMS в целом.

Функция runqueue() производит поиск всех потоков, нуждающихся в обработке очередей. При наличии таковых просматривается список очередей, ожидающих обработки, и для каждой из них вызывается соответствующая функция xxservice(). Каждая процедура xxservice(), в свою очередь, пытается передать все сообщения очереди следующему модулю. Если для каких-либо сообщений это не удается, они остаются в очереди, ожидая следующего вызова runqueue(), после чего процесс повторяется.

Управление передачей данных

Деление процесса передачи данных на два этапа, выполняемых, соответственно, функциями xxput() и xxservice(), позволяет реализовать механизм управления передачей данных.

Как уже упоминалось, обязательной для модуля является лишь функция xxput(). Рассмотрим ситуацию, когда модули потока не содержат процедур xxservice(). В этом случае, проиллюстрированном на рис. 5.19, каждый предыдущий модуль вызывает функцию xxput() следующего, передавая ему сообщение, с помощью функции ядра putnext(9F). Функция xxput() немедленно вызывает putnext(9F) и т.д.:

xxput(queue_t *q, mblk_t *mp) {

 putnext(q, mp);

}

Рис. 5.19. Передача данных без управления потоком

Когда данные достигают драйвера, он передает их непосредственно устройству. Если устройство занято, или драйвер не может немедленно обработать данные, сообщение уничтожается. В данном примере никакого управления потоком не происходит, и очереди сообщений не используются.

Хотя такой вариант может применяться для некоторых драйверов (как правило, для псевдоустройств, например, /dev/null), в общем случае устройство не может быть все время готово к обработке данных, а потеря данных из-за занятости устройства недопустима. Таким образом, в потоке может происходить блокирование передачи данных[60], и эта ситуация не должна приводить к потере сообщений, во избежание которой необходим согласованный между модулями механизм управления потоком. Для этого сообщения обрабатываются и буферизуются в соответствующей очереди модуля, а их передача возлагается на функцию xxservice(), вызываемую ядром автоматически. Для каждой очереди определены две ватерлинии — верхняя и нижняя, которые используются для контроля заполненности очереди. Если число сообщений превышает верхнюю ватерлинию, очередь считается переполненной, и передача сообщений блокируется, пока их число не станет меньше нижней ватерлинии.

Рассмотрим пример потока, модули 1 и 3 которого поддерживают управление потоком данных, а модуль 2 — нет. Другими словами, модуль 2 не имеет процедуры xxservice(). Когда сообщение достигает модуля 3, вызывается его функция xxput(). После необходимой обработки сообщения, оно помещается в очередь модуля 3 с помощью функции putq(9F). Если при этом число сообщений в очереди превышает верхнюю ватерлинию, putq(9F) устанавливает специальный флаг, сигнализирующий о том, что очередь переполнена:

mod1put(queue_t* q, mblk_t* mp) {

 /* Необходимая обработка сообщения */

 ...

 putq(q, mp);

}

Через некоторое время ядро автоматически запускает процедуру xxservice() модуля 3. Для каждого сообщения очереди xxput() вызывает функцию canput(9F), которая проверяет заполненность очереди следующего по потоку модуля. Функция canput(9F) имеет вид:

#include <sys/stream.h>

int canput(queue_t* q);

Заметим, что canput(9F) проверяет заполненность очереди следующего модуля, реализующего механизм управления передачей данных, т.е. производящего обработку очереди с помощью процедуры xxservice(). В противном случае, как уже говорилось, очередь модуля не принимает участия в передаче данных. В нашем примере, canput(9F) проверит заполненность очереди записи модуля 1. Функция возвращает истинное значение, если очередь может принять сообщение, и ложное — в противном случае. В зависимости от результата проверки процедура xxservice() либо передаст сообщение следующему модулю (в нашем примере — модулю 2, который после необходимой обработки сразу же передаст его модулю 1), либо вернет сообщение обратно в очередь, если следующая очередь переполнена.

Описанная схема показана на рис. 5.20. Ниже приведен скелет процедуры xxservice() модуля 3, иллюстрирующий описанный алгоритм передачи сообщений с использованием механизма управления передачей данных.

Рис. 5.20. Управление потоком данных

mod1service(queue_t *q) {

 mblk_t* mp;

 while ((mp = getq(q)) != NULL) {

 if (canput(q->q_next))

  putnext(q, mp);

 else {

  putbq(q, mp);

  break;

 }

}

В этом примере функция getq(9F) используется для извлечения следующего сообщения из очереди, а функция putbq(9F) — для помещения сообщения в начало очереди. Если модуль 1 блокирует передачу, т.е. canput(9F) вернет "ложно", процедура xxservice() завершает свою работу, и сообщения начинают буферизоваться в очереди модуля 3. При этом очередь временно исключается из списка очередей, ожидающих обработки, и процедура xxservice() для нее вызываться не будет. Данная ситуация продлится до тех пор, пока число сообщений очереди записи модуля 1 не станет меньше нижней ватерлинии.

Пока существует возникшая блокировка передачи, затор будет постепенно распространяться вверх по потоку, последовательно заполняя очереди модулей, пока, в конечном итоге, не достигнет головного модуля. Поскольку передачу данных в головной модуль (вниз по потоку) инициирует приложение, попытка передать данные в переполненный головной модуль вызовет блокирование процесса[61] и переход его в состояние сна.

В конечном итоге, модуль 1 обработает сообщения своей очереди, и их число станет меньше нижней ватерлинии. Как только очередь модуля 1 станет готовой к приему новых сообщений, планировщик STREAMS автоматически вызовет процедуры xxservice() для модулей, ожидавших освобождения очереди модуля в нашем примере — для модуля 3.

1 ... 88 89 90 91 92 93 94 95 96 ... 130
На этой странице вы можете бесплатно читать книгу Операционная система UNIX - Андрей Робачевский бесплатно.
Похожие на Операционная система UNIX - Андрей Робачевский книги

Оставить комментарий