View Javadoc

1   /*--------------------------------------------------------------------------
2    *  Copyright 2009 utgenome.org
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *--------------------------------------------------------------------------*/
16  //--------------------------------------
17  // utgb-core Project
18  //
19  // Pipe.java
20  // Since: 2010/05/19
21  //
22  // $URL$ 
23  // $Author$
24  //--------------------------------------
25  package org.utgenome.format;
26  
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.InputStreamReader;
30  import java.io.PipedReader;
31  import java.io.PipedWriter;
32  import java.io.Reader;
33  import java.io.Writer;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ExecutorService;
36  import java.util.concurrent.Executors;
37  
38  import org.xerial.util.log.Logger;
39  
40  /**
41   * A base class for converting an input format into another through a pipe.
42   * 
43   * @author leo
44   * 
45   */
46  public class FormatConversionReader extends Reader {
47  
48  	private static Logger _logger = Logger.getLogger(FormatConversionReader.class);
49  
50  	private ExecutorService threadPool;
51  
52  	private final PipeConsumer worker;
53  	private PipeRunner pipeRunner;
54  
55  	private boolean hasStarted = false;
56  
57  	public FormatConversionReader(Reader input, PipeConsumer worker) throws IOException {
58  		pipeRunner = new PipeRunnerForReader(input);
59  		this.worker = worker;
60  	}
61  
62  	public FormatConversionReader(InputStream input, PipeConsumer worker) throws IOException {
63  		pipeRunner = new PipeRunnerForInputStream(input);
64  		this.worker = worker;
65  	}
66  
67  	public static abstract class PipeConsumer {
68  
69  		/**
70  		 * Read the data from <i>in</i>, then output the data to <i>out</i>
71  		 * 
72  		 * @param in
73  		 * @param out
74  		 */
75  		public void consume(Reader in, Writer out) throws Exception {
76  			// do some thing;
77  		}
78  
79  		public void consume(InputStream in, Writer out) throws Exception {
80  			this.consume(new InputStreamReader(in), out);
81  		}
82  
83  		public void reportError(Exception e) {
84  			_logger.error(e);
85  		}
86  	}
87  
88  	private abstract class PipeRunner implements Callable<Void> {
89  		protected final PipedWriter pipeOut;
90  		private final PipedReader pipeIn;
91  
92  		public PipeRunner() throws IOException {
93  			pipeOut = new PipedWriter();
94  			pipeIn = new PipedReader(pipeOut);
95  		}
96  
97  		public int readPipe(char[] cbuf, int off, int len) throws IOException {
98  			return pipeIn.read(cbuf, off, len);
99  		}
100 
101 		public Void call() throws Exception {
102 			try {
103 				consume();
104 			}
105 			catch (Exception e) {
106 				_logger.error(e);
107 			}
108 			finally {
109 				if (pipeOut != null)
110 					pipeOut.close();
111 			}
112 			return null;
113 		}
114 
115 		public abstract void consume() throws Exception;
116 
117 		public void close() throws IOException {
118 			if (pipeIn != null)
119 				pipeIn.close();
120 		}
121 	}
122 
123 	private class PipeRunnerForReader extends PipeRunner {
124 
125 		private final Reader in;
126 
127 		public PipeRunnerForReader(Reader in) throws IOException {
128 			super();
129 			this.in = in;
130 			if (in == null)
131 				throw new NullPointerException("missing input reader");
132 		}
133 
134 		public void close() throws IOException {
135 			super.close();
136 			if (in != null)
137 				in.close();
138 		}
139 
140 		@Override
141 		public void consume() throws Exception {
142 			worker.consume(in, pipeOut);
143 		}
144 
145 	}
146 
147 	private class PipeRunnerForInputStream extends PipeRunner {
148 
149 		private final InputStream in;
150 
151 		public PipeRunnerForInputStream(InputStream in) throws IOException {
152 			super();
153 			this.in = in;
154 
155 			if (in == null)
156 				throw new NullPointerException("missing input stream");
157 		}
158 
159 		@Override
160 		public void consume() throws Exception {
161 			worker.consume(in, pipeOut);
162 		}
163 
164 		public void close() throws IOException {
165 			super.close();
166 			if (in != null)
167 				in.close();
168 		}
169 
170 	}
171 
172 	@Override
173 	public void close() throws IOException {
174 		pipeRunner.close();
175 	}
176 
177 	@Override
178 	public int read(char[] cbuf, int off, int len) throws IOException {
179 
180 		if (!hasStarted) {
181 			threadPool = Executors.newFixedThreadPool(1);
182 			threadPool.submit(pipeRunner);
183 			hasStarted = true;
184 		}
185 
186 		int ret = pipeRunner.readPipe(cbuf, off, len);
187 
188 		if (ret == -1) {
189 			threadPool.shutdownNow();
190 		}
191 		return ret;
192 	}
193 
194 }