вторник, 7 февраля 2012 г.

Java. Pipe. Реализации и сравнение.

Ну вот уж наконец добрался до написания статей. Каникулы прошли, но на них писать было абсолютно лень и вот руки по-тихоньку дошли.:)

Первое с чего решил я начать - Пайпы (Pipe). Pipe является разновидностью "трубок", а конкретно неименнованной однонаправленной "трубкой", т.е. каналом, данные по которому идут в одном направлении, в отличие от сокета, трубки с дву-направленным каналом, данные в котором могут идти в обоих направлениях. Неименованной же трубкой Pipe называется из-за того, что указатель(дескриптор) на уже созданный Pipe можно получить лишь передав его как переменную внутри программы. Именованные трубки, такие как сокет и линуксовый mkfifo могут быть доступны вне создавшей их программы и даже из сети(сокет).

Все выше перечисленные разновидности "трубок" являются незаменимыми средствами межпроцессного взаимодействия (Inter Process Communications - IPC) и реализованы в любой ОС... И соответственно реализованы и в Java (аж 3 способами), т.к. в виртуальной машине Java тоже реализованы свои потоки.


Итак поставим цель и перейдем к реализации:
Пусть программа состоит из 2 потоков, которые должны взаимодействовать. Для удобства взаимодействия сделаем удобную обертку объединяющую 2 пайпа и делающую что то на подобие сокета. Т.е. каждому из 2 потоков будет передаваться объект с методами записи и чтения, каждый из которых пишет/читает в/из одного из 2 объединенных пайпов. Для начала ограничимся передачей текстовых сообщений.

Для удобства опишем интерфейс, который нужно реализовать для каждого из 3 способов реализации:
  1. public interface IPipe {  
  2. public void Write(String text);  
  3. public String Read();  
  4. }

Способ первый. java.nio.Pipe
На мой взгляд самый предпочтительный вариант, ибо следующие 2 дают задержку при передаче примерно на 1 секунду. Если честно не понял в чем проблема, буду благодарен за подсказку:).
Для начала приведем полный листинг полученного класса и разберем его:
  1. import java.io.IOException;  
  2. import java.nio.ByteBuffer;  
  3. import java.nio.channels.Pipe;  
  4. import java.nio.channels.Pipe.SinkChannel;  
  5. import java.nio.channels.Pipe.SourceChannel;  
  6.   
  7. /** 
  8.  * 
  9.  * @author Александр Емельянов (mr.lex91@gmail.com) 
  10.  */  
  11. public class PipeConnectorBuffered implements IPipe{  
  12.     private SinkChannel writer;  
  13.     private SourceChannel reader;  
  14.     private String part="";  
  15.       
  16.     public static IPipe[] createPipe() throws IOException  
  17.     {  
  18.         PipeConnectorBuffered[] pipeConnector=new PipeConnectorBuffered[2];  
  19.         Pipe p1=Pipe.open();  
  20.         Pipe p2=Pipe.open();  
  21.         pipeConnector[0]=new PipeConnectorBuffered(p1.source(), p2.sink());  
  22.         pipeConnector[1]=new PipeConnectorBuffered(p2.source(), p1.sink());  
  23.         return pipeConnector;  
  24.     }  
  25.       
  26.     PipeConnectorBuffered(SourceChannel reader, SinkChannel writer)  
  27.     {  
  28.         this.reader=reader;  
  29.         this.writer=writer;  
  30.     }  
  31.     @Override  
  32.     public String Read()  
  33.     {  
  34.         int n=1024;  
  35.         int i=n;  
  36.         ByteBuffer buf1 = ByteBuffer.allocate(n);  
  37.         String str=part;  
  38.         try {  
  39.             while (true)  
  40.             {  
  41.                 if (str.contains("<END>"))  
  42.                 {  
  43.                     if (str.indexOf("<END>")+6<str.length()) part=str.substring(str.indexOf("<END>")+5, str.length());  
  44.                     else part="";  
  45.                     str=str.substring(0, str.indexOf("<END>"));  
  46.                     return str;  
  47.                 }  
  48.                 buf1.clear();  
  49.                 if(i<n) return str;  
  50.                 i = reader.read(buf1);  
  51.                 buf1.flip();  
  52.                 str+=new String(buf1.array(),0,i);  
  53.             }  
  54.         } catch (IOException ex) {}  
  55.         return str;  
  56.     }  
  57.     @Override  
  58.     public void Write(String text)  
  59.     {  
  60.         ByteBuffer buf = ByteBuffer.allocate(2*text.length()+5);  
  61.         buf.clear();  
  62.         buf.put((text+"<END>").getBytes());  
  63.         buf.flip();  
  64.     try {   
  65.         writer.write(buf);  
  66.     } catch (IOException ex) {}  
  67.     }  
  68. }  
Пайп создается одним простым вызовом Pipe.open() и еще 2 вызовами получаются дескрипторы 2 концов пайпа: SourceChannel(конец пайпа, из которого чиают) и SinkChannel(конец пайпа, в который пишут). Собственно с этими 2 объектами нам и надо разобраться. Что бы создать согласно нашей задаче 2 взаимосвязных объекта необходимо в каждый из них передать пишущий конец одного пайпа и читающий другого. Это и делает статичная функция createPipe(), она возвращает 2 объекта PipeConnectorBuffered, конструктор которого приватен, т.к. напрямую вызывать его вне класса необходимости нет.

SourceChannel принимает на как параметр метода read() и SinkChannel в метод write() переменную типа ByteBuffer. Метод read() возвращает количество прочитанных байт.

Думаю алгоритм записи и чтения сообщения вам понятен (вам понятно для чего нужны метка <END> и переменная part?:)). Если честно, то в остальных 2 реализациях PipeConnector функции read() и write() почти не отличаются (только типом оперируемых данных).
Большего интереса представляют собственно эти самые отличия.

2-й и 3-й способы
В отличие от java.nio.Pipe в этих случаях пайп создается по другому. Сначала создается пишущий конец (PipedOutputStream и PipedWriter), а затем читающий (PipedInputStream и PipedReader), в конструктор которого передается пишущий конец.
createPipe() для PipeConnectorStreamed:
  1. public static IPipe[] createPipe() throws IOException  
  2.     {  
  3.         IPipe[] pc=new PipeConnectorStreamed[2];  
  4.         PipedOutputStream pw1=new PipedOutputStream();  
  5.         PipedInputStream pr1=new PipedInputStream(pw1);  
  6.         PipedOutputStream pw2=new PipedOutputStream();  
  7.         PipedInputStream pr2=new PipedInputStream(pw2);  
  8.         pc[0]=new PipeConnectorStreamed(pr1,pw2);  
  9.         pc[1]=new PipeConnectorStreamed(pr2,pw1);  
  10.         return pc;  
  11.     }  
для PipeConnectorRW:
  1. public static IPipe[] createPipe() throws IOException  
  2.     {  
  3.         IPipe[] pc=new PipeConnectorRW[2];  
  4.         PipedWriter pw1=new PipedWriter();  
  5.         PipedReader pr1=new PipedReader(pw1);  
  6.         PipedWriter pw2=new PipedWriter();  
  7.         PipedReader pr2=new PipedReader(pw2);  
  8.         pc[0]=new PipeConnectorRW(pr1,pw2);  
  9.         pc[1]=new PipeConnectorRW(pr2,pw1);  
  10.         return pc;  
  11.     }  

Теперь вы можете оценить во истину удобство введения интерфейса IPiped :). Вне зависимости от выбора одного из 3 наших классов при тестировании (а переключаться приходится часто;)) использование в дальнейшем коде будет неизменным, поэтому меняется только 1 строчка на соответствующий <имя класса наследника IPiped>.createPipe().

Метод read() будет отличаться типом буфера для считывания. Для PipeConnectorRW ByteBuffer просто заменяется на CharBuffer, а для PipeConnectorStreamed на просто byte[] и соответственно уходят строки buf1.clear() и buf1.flip().
Метод write() для PipeConnectorRW сократиться и станет минимальным с одним вызовом принимающего в качестве параметра строку метода write() PipedWriter'a. А для PipeConnectorStreamed в метод write() необходимо передать именно массив байт, его получим из метода array() нашего ByteBuffer'a, а так же индексы из этого массива, означающие диапазон записываемых в пайп байт. Т.к. нам нужно записать все переданные байты, то в качестве индексов указываем 0 и длину передаваемого текста (text.length()+5).

Полный листинг классов PipeConnectorRW и PipeConnectorStreamed приведен ниже под катом.
PipeConnectorRW
  1. import java.io.IOException;  
  2. import java.io.PipedReader;  
  3. import java.io.PipedWriter;  
  4. import java.nio.ByteBuffer;  
  5. import java.nio.CharBuffer;  
  6.   
  7. /** 
  8.  * 
  9.  * @author Александр Емельянов (mr.lex91@gmail.com) 
  10.  */  
  11. public class PipeConnectorRW implements IPipe{  
  12.     private PipedWriter writer;  
  13.     private PipedReader reader;  
  14.     private String part="";  
  15.   
  16.     public static IPipe[] createPipe() throws IOException  
  17.     {  
  18.         IPipe[] pc=new PipeConnectorRW[2];  
  19.         PipedWriter pw1=new PipedWriter();  
  20.         PipedReader pr1=new PipedReader(pw1);  
  21.         PipedWriter pw2=new PipedWriter();  
  22.         PipedReader pr2=new PipedReader(pw2);  
  23.         pc[0]=new PipeConnectorRW(pr1,pw2);  
  24.         pc[1]=new PipeConnectorRW(pr2,pw1);  
  25.         return pc;  
  26.     }  
  27.     PipeConnectorRW(PipedReader reader, PipedWriter writer)  
  28.     {  
  29.         this.reader=reader;  
  30.         this.writer=writer;  
  31.     }  
  32.   
  33.     public String Read()  
  34.     {  
  35.         int n=1024;  
  36.         int i=n;  
  37.         CharBuffer buf1 = CharBuffer.allocate(n);  
  38.         String str=part;  
  39.         try {  
  40.             while (true)  
  41.             {  
  42.                 if (str.contains("<END>"))  
  43.                 {  
  44.                     if (str.indexOf("<END>")+6<str.length()) part=str.substring(str.indexOf("<END>")+5, str.length());  
  45.                     else part="";  
  46.                     str=str.substring(0, str.indexOf("<END>"));  
  47.                     return str;  
  48.                 }  
  49.                 buf1.clear();  
  50.                 if(i<n) return str;  
  51.                 i = reader.read(buf1);  
  52.                 buf1.flip();  
  53.                 str+=new String(buf1.array(),0,i);  
  54.             }  
  55.         } catch (IOException ex) {}  
  56.         return str;  
  57.     }  
  58.     public void Write(String text)  
  59.     {  
  60.     try {  
  61.         writer.write(text+"<END>");  
  62.     } catch (IOException ex) {}  
  63.     }  
  64. }  


PipeConnectorStreamed
  1. import java.io.*;  
  2. import java.nio.ByteBuffer;  
  3. import java.nio.CharBuffer;  
  4.   
  5. /** 
  6.  * 
  7.  * @author Александр Емельянов (mr.lex91@gmail.com) 
  8.  */  
  9. public class PipeConnectorStreamed implements IPipe{  
  10.     private PipedOutputStream writer;  
  11.     private PipedInputStream reader;  
  12.     private String part="";  
  13.   
  14.     public static IPipe[] createPipe() throws IOException  
  15.     {  
  16.         IPipe[] pc=new PipeConnectorStreamed[2];  
  17.         PipedOutputStream pw1=new PipedOutputStream();  
  18.         PipedInputStream pr1=new PipedInputStream(pw1);  
  19.         PipedOutputStream pw2=new PipedOutputStream();  
  20.         PipedInputStream pr2=new PipedInputStream(pw2);  
  21.         pc[0]=new PipeConnectorStreamed(pr1,pw2);  
  22.         pc[1]=new PipeConnectorStreamed(pr2,pw1);  
  23.         return pc;  
  24.     }  
  25.     PipeConnectorStreamed(PipedInputStream reader, PipedOutputStream writer)  
  26.     {  
  27.         this.reader=reader;  
  28.         this.writer=writer;  
  29.     }  
  30.   
  31.     public String Read()  
  32.     {  
  33.         int n=1024;  
  34.         int i=n;  
  35.         byte[] buf1 = new byte[n];  
  36.         String str=part;  
  37.         try {  
  38.             while (true)  
  39.             {  
  40.                 if (str.contains("<END>"))  
  41.                 {  
  42.                     if (str.indexOf("<END>")+6<str.length()) part=str.substring(str.indexOf("<END>")+5, str.length());  
  43.                     else part="";  
  44.                     str=str.substring(0, str.indexOf("<END>"));  
  45.                     return str;  
  46.                 }  
  47.                 if(i<n) return str;  
  48.                 i = reader.read(buf1);  
  49.                 str+=new String(buf1,0,i);  
  50.             }  
  51.         } catch (IOException ex) {}  
  52.         return str;  
  53.     }  
  54.     public void Write(String text)  
  55.     {  
  56.         ByteBuffer buf = ByteBuffer.allocate(2*text.length()+5);  
  57.         buf.clear();  
  58.         buf.put((text+"<END>").getBytes());  
  59.         buf.flip();  
  60.     try {  
  61.         writer.write(buf.array(),0,text.length()+5);  
  62.     } catch (IOException ex) {}  
  63.     }  
  64. }  
В данной статье представлена только пересылка текста. Пересылка файла либо других бинарных данных не вызовет больших затруднений, ибо передача все равно идет байтовая (кроме PipeConnectorRW).

"Из серии записок "из личного опыта

Комментариев нет:

Отправить комментарий