001 /* 002 * Copyright (C) 2012 eXo Platform SAS. 003 * 004 * This is free software; you can redistribute it and/or modify it 005 * under the terms of the GNU Lesser General Public License as 006 * published by the Free Software Foundation; either version 2.1 of 007 * the License, or (at your option) any later version. 008 * 009 * This software is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * You should have received a copy of the GNU Lesser General Public 015 * License along with this software; if not, write to the Free 016 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 017 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 018 */ 019 020 package org.crsh.shell.impl.async; 021 022 import org.crsh.shell.ShellProcess; 023 import org.crsh.shell.ShellProcessContext; 024 import org.crsh.shell.ShellResponse; 025 import org.crsh.text.Chunk; 026 027 import java.io.IOException; 028 import java.util.concurrent.Callable; 029 030 public class AsyncProcess implements ShellProcess { 031 032 033 /** . */ 034 private final String request; 035 036 /** . */ 037 private ShellProcessContext caller; 038 039 /** . */ 040 private ShellProcess callee; 041 042 /** . */ 043 private AsyncShell shell; 044 045 /** . */ 046 private Status status; 047 048 /** . */ 049 private final Object lock; 050 051 AsyncProcess(AsyncShell shell, String request) { 052 this.shell = shell; 053 this.request = request; 054 this.callee = null; 055 this.status = Status.CONSTRUCTED; 056 this.lock = new Object(); 057 } 058 059 public Status getStatus() { 060 return status; 061 } 062 063 /** . */ 064 private final ShellProcessContext context = new ShellProcessContext() { 065 public int getWidth() { 066 return caller.getWidth(); 067 } 068 069 public int getHeight() { 070 return caller.getHeight(); 071 } 072 073 public String getProperty(String name) { 074 return caller.getProperty(name); 075 } 076 077 public boolean takeAlternateBuffer() throws IOException { 078 return caller.takeAlternateBuffer(); 079 } 080 081 public boolean releaseAlternateBuffer() throws IOException { 082 return caller.releaseAlternateBuffer(); 083 } 084 085 public String readLine(String msg, boolean echo) { 086 return caller.readLine(msg, echo); 087 } 088 089 public Class<Chunk> getConsumedType() { 090 return Chunk.class; 091 } 092 093 public void provide(Chunk element) throws IOException { 094 caller.provide(element); 095 } 096 097 public void flush() throws IOException { 098 caller.flush(); 099 } 100 101 public void end(ShellResponse response) { 102 // Always leave the status in terminated status if the method succeeds 103 // Cancelled -> Terminated 104 // Evaluating -> Terminated 105 // Terminated -> Terminated 106 synchronized (lock) { 107 switch (status) { 108 case CONSTRUCTED: 109 case QUEUED: 110 throw new AssertionError("Should not happen"); 111 case CANCELED: 112 // We substitute the response 113 response = ShellResponse.cancelled(); 114 status = Status.TERMINATED; 115 break; 116 case EVALUATING: 117 status = Status.TERMINATED; 118 break; 119 case TERMINATED: 120 throw new IllegalStateException("Cannot end a process already terminated"); 121 } 122 } 123 124 // 125 caller.end(response); 126 } 127 }; 128 129 public void execute(ShellProcessContext processContext) { 130 131 // Constructed -> Queued 132 synchronized (lock) { 133 if (status != Status.CONSTRUCTED) { 134 throw new IllegalStateException("State was " + status); 135 } 136 137 // Update state 138 status = Status.QUEUED; 139 callee = shell.shell.createProcess(request); 140 caller = processContext; 141 } 142 143 // Create the task 144 Callable<AsyncProcess> task = new Callable<AsyncProcess>() { 145 public AsyncProcess call() throws Exception { 146 try { 147 // Cancelled -> Cancelled 148 // Queued -> Evaluating 149 ShellResponse response; 150 synchronized (lock) { 151 switch (status) { 152 case CANCELED: 153 // Do nothing it was canceled in the mean time 154 response = ShellResponse.cancelled(); 155 break; 156 case QUEUED: 157 // Ok we are going to run it 158 status = Status.EVALUATING; 159 response = null; 160 break; 161 default: 162 // Just in case but this can only be called by the queue 163 throw new AssertionError(); 164 } 165 } 166 167 // Execute the process if we are in evalating state 168 if (response == null) { 169 // Here the status could already be in status cancelled 170 // it is a race condition, execution still happens 171 // but the callback of the callee to the end method will make the process 172 // terminate and use a cancel response 173 try { 174 callee.execute(context); 175 response = ShellResponse.ok(); 176 } 177 catch (Throwable t) { 178 response = ShellResponse.internalError("Unexpected throwable when executing process", t); 179 } 180 } 181 182 // Make the callback 183 // Calling terminated twice will have no effect 184 try { 185 context.end(response); 186 } 187 catch (Throwable t) { 188 // Log it 189 } 190 191 // We return this but we don't really care for now 192 return AsyncProcess.this; 193 } 194 finally { 195 synchronized (shell.lock) { 196 shell.processes.remove(AsyncProcess.this); 197 } 198 } 199 } 200 }; 201 202 // 203 synchronized (shell.lock) { 204 if (!shell.closed) { 205 shell.executor.submit(task); 206 shell.processes.add(this); 207 } else { 208 boolean invokeEnd; 209 synchronized (lock) { 210 invokeEnd = status != Status.TERMINATED; 211 status = Status.TERMINATED; 212 } 213 if (invokeEnd) { 214 caller.end(ShellResponse.cancelled()); 215 } 216 } 217 } 218 } 219 220 public void cancel() { 221 // Construcuted -> ISE 222 // Evaluating -> Canceled 223 // Queued -> Canceled 224 // Cancelled -> Cancelled 225 // Terminated -> Terminated 226 boolean cancel; 227 synchronized (lock) { 228 switch (status) { 229 case CONSTRUCTED: 230 throw new IllegalStateException("Cannot call cancel on process that was not scheduled for execution yet"); 231 case QUEUED: 232 status = Status.CANCELED; 233 cancel = false; 234 break; 235 case EVALUATING: 236 status = Status.CANCELED; 237 cancel = true; 238 break; 239 default: 240 cancel = false; 241 break; 242 } 243 } 244 245 // 246 if (cancel) { 247 callee.cancel(); 248 } 249 } 250 }