001    /*
002     * Copyright (C) 2012 eXo Platform SAS.
003     *
004     * This is free software; you can redistribute it and/or modify it
005     * under the terms of the GNU Lesser General Public License as
006     * published by the Free Software Foundation; either version 2.1 of
007     * the License, or (at your option) any later version.
008     *
009     * This software is distributed in the hope that it will be useful,
010     * but WITHOUT ANY WARRANTY; without even the implied warranty of
011     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012     * Lesser General Public License for more details.
013     *
014     * You should have received a copy of the GNU Lesser General Public
015     * License along with this software; if not, write to the Free
016     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
017     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
018     */
019    
020    package org.crsh.shell.impl.async;
021    
022    import org.crsh.keyboard.KeyHandler;
023    import org.crsh.text.Screenable;
024    import org.crsh.shell.ShellProcess;
025    import org.crsh.shell.ShellProcessContext;
026    import org.crsh.shell.ShellResponse;
027    import org.crsh.text.Style;
028    
029    import java.io.IOException;
030    import java.util.concurrent.Callable;
031    
032    public class AsyncProcess implements ShellProcess {
033    
034    
035      /** . */
036      private final String request;
037    
038      /** . */
039      private ShellProcessContext caller;
040    
041      /** . */
042      private ShellProcess callee;
043    
044      /** . */
045      private AsyncShell shell;
046    
047      /** . */
048      private Status status;
049    
050      /** . */
051      private final Object lock;
052    
053      AsyncProcess(AsyncShell shell, String request) {
054        this.shell = shell;
055        this.request = request;
056        this.callee = null;
057        this.status = Status.CONSTRUCTED;
058        this.lock = new Object();
059      }
060    
061      public Status getStatus() {
062        return status;
063      }
064    
065      /** . */
066      private final ShellProcessContext context = new ShellProcessContext() {
067        public int getWidth() {
068          return caller.getWidth();
069        }
070    
071        public int getHeight() {
072          return caller.getHeight();
073        }
074    
075        public String getProperty(String name) {
076          return caller.getProperty(name);
077        }
078    
079        public boolean takeAlternateBuffer() throws IOException {
080          return caller.takeAlternateBuffer();
081        }
082    
083        public boolean releaseAlternateBuffer() throws IOException {
084          return caller.releaseAlternateBuffer();
085        }
086    
087        public String readLine(String msg, boolean echo) throws IOException, InterruptedException {
088          return caller.readLine(msg, echo);
089        }
090    
091        public Screenable append(CharSequence s) throws IOException {
092          caller.append(s);
093          return this;
094        }
095    
096        @Override
097        public Screenable append(char c) throws IOException {
098          caller.append(c);
099          return this;
100        }
101    
102        @Override
103        public Screenable append(CharSequence csq, int start, int end) throws IOException {
104          caller.append(csq, start, end);
105          return this;
106        }
107    
108        public Screenable append(Style style) throws IOException {
109          caller.append(style);
110          return this;
111        }
112    
113        public Screenable cls() throws IOException {
114          caller.cls();
115          return this;
116        }
117    
118        public void flush() throws IOException {
119          caller.flush();
120        }
121    
122        public void end(ShellResponse response) {
123          // Always leave the status in terminated status if the method succeeds
124          // Cancelled -> Terminated
125          // Evaluating -> Terminated
126          // Terminated -> Terminated
127          synchronized (lock) {
128            switch (status) {
129              case CONSTRUCTED:
130              case QUEUED:
131                throw new AssertionError("Should not happen");
132              case CANCELED:
133                // We substitute the response
134                response = ShellResponse.cancelled();
135                status = Status.TERMINATED;
136                break;
137              case EVALUATING:
138                status = Status.TERMINATED;
139                break;
140              case TERMINATED:
141                throw new IllegalStateException("Cannot end a process already terminated");
142            }
143          }
144    
145          //
146          caller.end(response);
147        }
148      };
149    
150      @Override
151      public KeyHandler getKeyHandler() {
152        synchronized (lock) {
153          if (status != Status.EVALUATING) {
154            throw new IllegalStateException();
155          }
156        }
157        // Should it be synchronous or not ????
158        // no clue for now :-)
159        return callee.getKeyHandler();
160      }
161    
162      public void execute(ShellProcessContext processContext) {
163    
164        // Constructed -> Queued
165        synchronized (lock) {
166          if (status != Status.CONSTRUCTED) {
167            throw new IllegalStateException("State was " + status);
168          }
169    
170          // Update state
171          status = Status.QUEUED;
172          callee = shell.shell.createProcess(request);
173          caller = processContext;
174        }
175    
176        // Create the task
177        Callable<AsyncProcess> task = new Callable<AsyncProcess>() {
178          public AsyncProcess call() throws Exception {
179            try {
180              // Cancelled -> Cancelled
181              // Queued -> Evaluating
182              ShellResponse response;
183              synchronized (lock) {
184                switch (status) {
185                  case CANCELED:
186                    // Do nothing it was canceled in the mean time
187                    response = ShellResponse.cancelled();
188                    break;
189                  case QUEUED:
190                    // Ok we are going to run it
191                    status = Status.EVALUATING;
192                    response = null;
193                    break;
194                  default:
195                    // Just in case but this can only be called by the queue
196                    throw new AssertionError();
197                }
198              }
199    
200              // Execute the process if we are in evalating state
201              if (response == null) {
202                // Here the status could already be in status cancelled
203                // it is a race condition, execution still happens
204                // but the callback of the callee to the end method will make the process
205                // terminate and use a cancel response
206                try {
207                  callee.execute(context);
208                  response = ShellResponse.ok();
209                }
210                catch (Throwable t) {
211                  response = ShellResponse.internalError("Unexpected throwable when executing process", t);
212                }
213              }
214    
215              // Make the callback
216              // Calling terminated twice will have no effect
217              try {
218                context.end(response);
219              }
220              catch (Throwable t) {
221                // Log it
222              }
223    
224              // We return this but we don't really care for now
225              return AsyncProcess.this;
226            }
227            finally {
228              synchronized (shell.lock) {
229                shell.processes.remove(AsyncProcess.this);
230              }
231            }
232          }
233        };
234    
235        //
236        synchronized (shell.lock) {
237          if (!shell.closed) {
238            shell.executor.submit(task);
239            shell.processes.add(this);
240          } else {
241            boolean invokeEnd;
242            synchronized (lock) {
243              invokeEnd = status != Status.TERMINATED;
244              status = Status.TERMINATED;
245            }
246            if (invokeEnd) {
247              caller.end(ShellResponse.cancelled());
248            }
249          }
250        }
251      }
252    
253      public void cancel() {
254        // Construcuted -> ISE
255        // Evaluating -> Canceled
256        // Queued -> Canceled
257        // Cancelled -> Cancelled
258        // Terminated -> Terminated
259        boolean cancel;
260        synchronized (lock) {
261          switch (status) {
262            case CONSTRUCTED:
263              throw new IllegalStateException("Cannot call cancel on process that was not scheduled for execution yet");
264            case QUEUED:
265              status = Status.CANCELED;
266              cancel = false;
267              break;
268            case EVALUATING:
269              status = Status.CANCELED;
270              cancel = true;
271              break;
272            default:
273              cancel = false;
274              break;
275          }
276        }
277    
278        //
279        if (cancel) {
280          callee.cancel();
281        }
282      }
283    }