1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  package org.utgenome.format.wig;
26  
27  import java.io.IOException;
28  import java.io.PipedReader;
29  import java.io.PipedWriter;
30  import java.io.PrintWriter;
31  import java.io.Reader;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  
35  import org.utgenome.UTGBException;
36  import org.xerial.util.log.Logger;
37  
38  public class WIG2SilkReader extends Reader
39  {
40  
41      private static Logger _logger = Logger.getLogger(WIG2SilkReader.class);
42  
43      private final Reader wigReader;
44      private ExecutorService threadPool;
45  
46      private final PipedWriter pipeOut;
47      private final PipedReader pipeIn;
48  
49      private boolean hasStarted = false;
50  
51      public WIG2SilkReader(Reader wigReader) throws IOException
52      {
53          this.wigReader = wigReader;
54  
55          pipeOut = new PipedWriter();
56          pipeIn = new PipedReader(pipeOut);
57  
58      }
59  
60      private static class PipeWorker implements Runnable
61      {
62  
63          private final WIG2Silk wig2silk;
64          private final PrintWriter out;
65  
66          public PipeWorker(Reader in, PrintWriter out) throws IOException
67          {
68              wig2silk = new WIG2Silk(in);
69              this.out = out;
70          }
71  
72          public void run()
73          {
74              if (out == null)
75                  return;
76              try
77              {
78                  wig2silk.toSilk(out);
79                  out.close();
80              }
81              catch (IOException e)
82              {
83                  _logger.error(e);
84              }
85              catch (UTGBException e)
86              {
87                  _logger.error(e);
88              }
89          }
90  
91      }
92  
93      @Override
94      public void close() throws IOException
95      {
96          pipeIn.close();
97          wigReader.close();
98      }
99  
100     @Override
101     public int read(char[] cbuf, int off, int len) throws IOException
102     {
103 
104         if (!hasStarted)
105         {
106             threadPool = Executors.newFixedThreadPool(1);
107             threadPool.submit(new PipeWorker(wigReader, new PrintWriter(pipeOut)));
108             hasStarted = true;
109         }
110 
111         int ret = pipeIn.read(cbuf, off, len);
112 
113         if (ret == -1)
114         {
115             threadPool.shutdownNow();
116         }
117         return ret;
118     }
119 
120 }