001 /* 002 * Copyright (C) 2012 eXo Platform SAS. 003 * 004 * This is free software; you can redistribute it and/or modify it 005 * under the terms of the GNU Lesser General Public License as 006 * published by the Free Software Foundation; either version 2.1 of 007 * the License, or (at your option) any later version. 008 * 009 * This software is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * You should have received a copy of the GNU Lesser General Public 015 * License along with this software; if not, write to the Free 016 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 017 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 018 */ 019 020 package org.crsh.shell.impl.async; 021 022 import org.crsh.keyboard.KeyHandler; 023 import org.crsh.text.Screenable; 024 import org.crsh.shell.ShellProcess; 025 import org.crsh.shell.ShellProcessContext; 026 import org.crsh.shell.ShellResponse; 027 import org.crsh.text.Style; 028 029 import java.io.IOException; 030 import java.util.concurrent.Callable; 031 032 public class AsyncProcess implements ShellProcess { 033 034 035 /** . */ 036 private final String request; 037 038 /** . */ 039 private ShellProcessContext caller; 040 041 /** . */ 042 private ShellProcess callee; 043 044 /** . */ 045 private AsyncShell shell; 046 047 /** . */ 048 private Status status; 049 050 /** . */ 051 private final Object lock; 052 053 AsyncProcess(AsyncShell shell, String request) { 054 this.shell = shell; 055 this.request = request; 056 this.callee = null; 057 this.status = Status.CONSTRUCTED; 058 this.lock = new Object(); 059 } 060 061 public Status getStatus() { 062 return status; 063 } 064 065 /** . */ 066 private final ShellProcessContext context = new ShellProcessContext() { 067 public int getWidth() { 068 return caller.getWidth(); 069 } 070 071 public int getHeight() { 072 return caller.getHeight(); 073 } 074 075 public String getProperty(String name) { 076 return caller.getProperty(name); 077 } 078 079 public boolean takeAlternateBuffer() throws IOException { 080 return caller.takeAlternateBuffer(); 081 } 082 083 public boolean releaseAlternateBuffer() throws IOException { 084 return caller.releaseAlternateBuffer(); 085 } 086 087 public String readLine(String msg, boolean echo) throws IOException, InterruptedException { 088 return caller.readLine(msg, echo); 089 } 090 091 public Screenable append(CharSequence s) throws IOException { 092 caller.append(s); 093 return this; 094 } 095 096 @Override 097 public Screenable append(char c) throws IOException { 098 caller.append(c); 099 return this; 100 } 101 102 @Override 103 public Screenable append(CharSequence csq, int start, int end) throws IOException { 104 caller.append(csq, start, end); 105 return this; 106 } 107 108 public Screenable append(Style style) throws IOException { 109 caller.append(style); 110 return this; 111 } 112 113 public Screenable cls() throws IOException { 114 caller.cls(); 115 return this; 116 } 117 118 public void flush() throws IOException { 119 caller.flush(); 120 } 121 122 public void end(ShellResponse response) { 123 // Always leave the status in terminated status if the method succeeds 124 // Cancelled -> Terminated 125 // Evaluating -> Terminated 126 // Terminated -> Terminated 127 synchronized (lock) { 128 switch (status) { 129 case CONSTRUCTED: 130 case QUEUED: 131 throw new AssertionError("Should not happen"); 132 case CANCELED: 133 // We substitute the response 134 response = ShellResponse.cancelled(); 135 status = Status.TERMINATED; 136 break; 137 case EVALUATING: 138 status = Status.TERMINATED; 139 break; 140 case TERMINATED: 141 throw new IllegalStateException("Cannot end a process already terminated"); 142 } 143 } 144 145 // 146 caller.end(response); 147 } 148 }; 149 150 @Override 151 public KeyHandler getKeyHandler() { 152 synchronized (lock) { 153 if (status != Status.EVALUATING) { 154 throw new IllegalStateException(); 155 } 156 } 157 // Should it be synchronous or not ???? 158 // no clue for now :-) 159 return callee.getKeyHandler(); 160 } 161 162 public void execute(ShellProcessContext processContext) { 163 164 // Constructed -> Queued 165 synchronized (lock) { 166 if (status != Status.CONSTRUCTED) { 167 throw new IllegalStateException("State was " + status); 168 } 169 170 // Update state 171 status = Status.QUEUED; 172 callee = shell.shell.createProcess(request); 173 caller = processContext; 174 } 175 176 // Create the task 177 Callable<AsyncProcess> task = new Callable<AsyncProcess>() { 178 public AsyncProcess call() throws Exception { 179 try { 180 // Cancelled -> Cancelled 181 // Queued -> Evaluating 182 ShellResponse response; 183 synchronized (lock) { 184 switch (status) { 185 case CANCELED: 186 // Do nothing it was canceled in the mean time 187 response = ShellResponse.cancelled(); 188 break; 189 case QUEUED: 190 // Ok we are going to run it 191 status = Status.EVALUATING; 192 response = null; 193 break; 194 default: 195 // Just in case but this can only be called by the queue 196 throw new AssertionError(); 197 } 198 } 199 200 // Execute the process if we are in evalating state 201 if (response == null) { 202 // Here the status could already be in status cancelled 203 // it is a race condition, execution still happens 204 // but the callback of the callee to the end method will make the process 205 // terminate and use a cancel response 206 try { 207 callee.execute(context); 208 response = ShellResponse.ok(); 209 } 210 catch (Throwable t) { 211 response = ShellResponse.internalError("Unexpected throwable when executing process", t); 212 } 213 } 214 215 // Make the callback 216 // Calling terminated twice will have no effect 217 try { 218 context.end(response); 219 } 220 catch (Throwable t) { 221 // Log it 222 } 223 224 // We return this but we don't really care for now 225 return AsyncProcess.this; 226 } 227 finally { 228 synchronized (shell.lock) { 229 shell.processes.remove(AsyncProcess.this); 230 } 231 } 232 } 233 }; 234 235 // 236 synchronized (shell.lock) { 237 if (!shell.closed) { 238 shell.executor.submit(task); 239 shell.processes.add(this); 240 } else { 241 boolean invokeEnd; 242 synchronized (lock) { 243 invokeEnd = status != Status.TERMINATED; 244 status = Status.TERMINATED; 245 } 246 if (invokeEnd) { 247 caller.end(ShellResponse.cancelled()); 248 } 249 } 250 } 251 } 252 253 public void cancel() { 254 // Construcuted -> ISE 255 // Evaluating -> Canceled 256 // Queued -> Canceled 257 // Cancelled -> Cancelled 258 // Terminated -> Terminated 259 boolean cancel; 260 synchronized (lock) { 261 switch (status) { 262 case CONSTRUCTED: 263 throw new IllegalStateException("Cannot call cancel on process that was not scheduled for execution yet"); 264 case QUEUED: 265 status = Status.CANCELED; 266 cancel = false; 267 break; 268 case EVALUATING: 269 status = Status.CANCELED; 270 cancel = true; 271 break; 272 default: 273 cancel = false; 274 break; 275 } 276 } 277 278 // 279 if (cancel) { 280 callee.cancel(); 281 } 282 } 283 }