1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.utgenome.format;
26
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.InputStreamReader;
30 import java.io.PipedReader;
31 import java.io.PipedWriter;
32 import java.io.Reader;
33 import java.io.Writer;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37
38 import org.xerial.util.log.Logger;
39
40
41
42
43
44
45
46 public class FormatConversionReader extends Reader {
47
48 private static Logger _logger = Logger.getLogger(FormatConversionReader.class);
49
50 private ExecutorService threadPool;
51
52 private final PipeConsumer worker;
53 private PipeRunner pipeRunner;
54
55 private boolean hasStarted = false;
56
57 public FormatConversionReader(Reader input, PipeConsumer worker) throws IOException {
58 pipeRunner = new PipeRunnerForReader(input);
59 this.worker = worker;
60 }
61
62 public FormatConversionReader(InputStream input, PipeConsumer worker) throws IOException {
63 pipeRunner = new PipeRunnerForInputStream(input);
64 this.worker = worker;
65 }
66
67 public static abstract class PipeConsumer {
68
69
70
71
72
73
74
75 public void consume(Reader in, Writer out) throws Exception {
76
77 }
78
79 public void consume(InputStream in, Writer out) throws Exception {
80 this.consume(new InputStreamReader(in), out);
81 }
82
83 public void reportError(Exception e) {
84 _logger.error(e);
85 }
86 }
87
88 private abstract class PipeRunner implements Callable<Void> {
89 protected final PipedWriter pipeOut;
90 private final PipedReader pipeIn;
91
92 public PipeRunner() throws IOException {
93 pipeOut = new PipedWriter();
94 pipeIn = new PipedReader(pipeOut);
95 }
96
97 public int readPipe(char[] cbuf, int off, int len) throws IOException {
98 return pipeIn.read(cbuf, off, len);
99 }
100
101 public Void call() throws Exception {
102 try {
103 consume();
104 }
105 catch (Exception e) {
106 _logger.error(e);
107 }
108 finally {
109 if (pipeOut != null)
110 pipeOut.close();
111 }
112 return null;
113 }
114
115 public abstract void consume() throws Exception;
116
117 public void close() throws IOException {
118 if (pipeIn != null)
119 pipeIn.close();
120 }
121 }
122
123 private class PipeRunnerForReader extends PipeRunner {
124
125 private final Reader in;
126
127 public PipeRunnerForReader(Reader in) throws IOException {
128 super();
129 this.in = in;
130 if (in == null)
131 throw new NullPointerException("missing input reader");
132 }
133
134 public void close() throws IOException {
135 super.close();
136 if (in != null)
137 in.close();
138 }
139
140 @Override
141 public void consume() throws Exception {
142 worker.consume(in, pipeOut);
143 }
144
145 }
146
147 private class PipeRunnerForInputStream extends PipeRunner {
148
149 private final InputStream in;
150
151 public PipeRunnerForInputStream(InputStream in) throws IOException {
152 super();
153 this.in = in;
154
155 if (in == null)
156 throw new NullPointerException("missing input stream");
157 }
158
159 @Override
160 public void consume() throws Exception {
161 worker.consume(in, pipeOut);
162 }
163
164 public void close() throws IOException {
165 super.close();
166 if (in != null)
167 in.close();
168 }
169
170 }
171
172 @Override
173 public void close() throws IOException {
174 pipeRunner.close();
175 }
176
177 @Override
178 public int read(char[] cbuf, int off, int len) throws IOException {
179
180 if (!hasStarted) {
181 threadPool = Executors.newFixedThreadPool(1);
182 threadPool.submit(pipeRunner);
183 hasStarted = true;
184 }
185
186 int ret = pipeRunner.readPipe(cbuf, off, len);
187
188 if (ret == -1) {
189 threadPool.shutdownNow();
190 }
191 return ret;
192 }
193
194 }