001    /*
002     * Copyright (C) 2012 eXo Platform SAS.
003     *
004     * This is free software; you can redistribute it and/or modify it
005     * under the terms of the GNU Lesser General Public License as
006     * published by the Free Software Foundation; either version 2.1 of
007     * the License, or (at your option) any later version.
008     *
009     * This software is distributed in the hope that it will be useful,
010     * but WITHOUT ANY WARRANTY; without even the implied warranty of
011     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012     * Lesser General Public License for more details.
013     *
014     * You should have received a copy of the GNU Lesser General Public
015     * License along with this software; if not, write to the Free
016     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
017     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
018     */
019    
020    package org.crsh.shell.impl.async;
021    
022    import org.crsh.shell.ShellProcess;
023    import org.crsh.shell.ShellProcessContext;
024    import org.crsh.shell.ShellResponse;
025    import org.crsh.text.Chunk;
026    
027    import java.io.IOException;
028    import java.util.concurrent.Callable;
029    
030    public class AsyncProcess implements ShellProcess {
031    
032    
033      /** . */
034      private final String request;
035    
036      /** . */
037      private ShellProcessContext caller;
038    
039      /** . */
040      private ShellProcess callee;
041    
042      /** . */
043      private AsyncShell shell;
044    
045      /** . */
046      private Status status;
047    
048      /** . */
049      private final Object lock;
050    
051      AsyncProcess(AsyncShell shell, String request) {
052        this.shell = shell;
053        this.request = request;
054        this.callee = null;
055        this.status = Status.CONSTRUCTED;
056        this.lock = new Object();
057      }
058    
059      public Status getStatus() {
060        return status;
061      }
062    
063      /** . */
064      private final ShellProcessContext context = new ShellProcessContext() {
065        public int getWidth() {
066          return caller.getWidth();
067        }
068    
069        public int getHeight() {
070          return caller.getHeight();
071        }
072    
073        public String getProperty(String name) {
074          return caller.getProperty(name);
075        }
076    
077        public boolean takeAlternateBuffer() throws IOException {
078          return caller.takeAlternateBuffer();
079        }
080    
081        public boolean releaseAlternateBuffer() throws IOException {
082          return caller.releaseAlternateBuffer();
083        }
084    
085        public String readLine(String msg, boolean echo) {
086          return caller.readLine(msg, echo);
087        }
088    
089        public Class<Chunk> getConsumedType() {
090          return Chunk.class;
091        }
092    
093        public void provide(Chunk element) throws IOException {
094          caller.provide(element);
095        }
096    
097        public void flush() throws IOException {
098          caller.flush();
099        }
100    
101        public void end(ShellResponse response) {
102          // Always leave the status in terminated status if the method succeeds
103          // Cancelled -> Terminated
104          // Evaluating -> Terminated
105          // Terminated -> Terminated
106          synchronized (lock) {
107            switch (status) {
108              case CONSTRUCTED:
109              case QUEUED:
110                throw new AssertionError("Should not happen");
111              case CANCELED:
112                // We substitute the response
113                response = ShellResponse.cancelled();
114                status = Status.TERMINATED;
115                break;
116              case EVALUATING:
117                status = Status.TERMINATED;
118                break;
119              case TERMINATED:
120                throw new IllegalStateException("Cannot end a process already terminated");
121            }
122          }
123    
124          //
125          caller.end(response);
126        }
127      };
128    
129      public void execute(ShellProcessContext processContext) {
130    
131        // Constructed -> Queued
132        synchronized (lock) {
133          if (status != Status.CONSTRUCTED) {
134            throw new IllegalStateException("State was " + status);
135          }
136    
137          // Update state
138          status = Status.QUEUED;
139          callee = shell.shell.createProcess(request);
140          caller = processContext;
141        }
142    
143        // Create the task
144        Callable<AsyncProcess> task = new Callable<AsyncProcess>() {
145          public AsyncProcess call() throws Exception {
146            try {
147              // Cancelled -> Cancelled
148              // Queued -> Evaluating
149              ShellResponse response;
150              synchronized (lock) {
151                switch (status) {
152                  case CANCELED:
153                    // Do nothing it was canceled in the mean time
154                    response = ShellResponse.cancelled();
155                    break;
156                  case QUEUED:
157                    // Ok we are going to run it
158                    status = Status.EVALUATING;
159                    response = null;
160                    break;
161                  default:
162                    // Just in case but this can only be called by the queue
163                    throw new AssertionError();
164                }
165              }
166    
167              // Execute the process if we are in evalating state
168              if (response == null) {
169                // Here the status could already be in status cancelled
170                // it is a race condition, execution still happens
171                // but the callback of the callee to the end method will make the process
172                // terminate and use a cancel response
173                try {
174                  callee.execute(context);
175                  response = ShellResponse.ok();
176                }
177                catch (Throwable t) {
178                  response = ShellResponse.internalError("Unexpected throwable when executing process", t);
179                }
180              }
181    
182              // Make the callback
183              // Calling terminated twice will have no effect
184              try {
185                context.end(response);
186              }
187              catch (Throwable t) {
188                // Log it
189              }
190    
191              // We return this but we don't really care for now
192              return AsyncProcess.this;
193            }
194            finally {
195              synchronized (shell.lock) {
196                shell.processes.remove(AsyncProcess.this);
197              }
198            }
199          }
200        };
201    
202        //
203        synchronized (shell.lock) {
204          if (!shell.closed) {
205            shell.executor.submit(task);
206            shell.processes.add(this);
207          } else {
208            boolean invokeEnd;
209            synchronized (lock) {
210              invokeEnd = status != Status.TERMINATED;
211              status = Status.TERMINATED;
212            }
213            if (invokeEnd) {
214              caller.end(ShellResponse.cancelled());
215            }
216          }
217        }
218      }
219    
220      public void cancel() {
221        // Construcuted -> ISE
222        // Evaluating -> Canceled
223        // Queued -> Canceled
224        // Cancelled -> Cancelled
225        // Terminated -> Terminated
226        boolean cancel;
227        synchronized (lock) {
228          switch (status) {
229            case CONSTRUCTED:
230              throw new IllegalStateException("Cannot call cancel on process that was not scheduled for execution yet");
231            case QUEUED:
232              status = Status.CANCELED;
233              cancel = false;
234              break;
235            case EVALUATING:
236              status = Status.CANCELED;
237              cancel = true;
238              break;
239            default:
240              cancel = false;
241              break;
242          }
243        }
244    
245        //
246        if (cancel) {
247          callee.cancel();
248        }
249      }
250    }