View Javadoc

1   package org.csc.phynixx.xa;
2   
3   /*
4    * #%L
5    * phynixx-xa
6    * %%
7    * Copyright (C) 2014 Christoph Schmidt-Casdorff
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import javax.transaction.xa.XAException;
24  import javax.transaction.xa.XAResource;
25  import javax.transaction.xa.Xid;
26  
27  import org.csc.phynixx.common.exceptions.SampleTransactionalException;
28  import org.csc.phynixx.common.logger.IPhynixxLogger;
29  import org.csc.phynixx.common.logger.PhynixxLogManager;
30  import org.csc.phynixx.connection.IManagedConnectionEvent;
31  import org.csc.phynixx.connection.IPhynixxConnection;
32  import org.csc.phynixx.connection.IPhynixxManagedConnection;
33  import org.csc.phynixx.connection.IPhynixxManagedConnectionListener;
34  import org.csc.phynixx.connection.PhynixxManagedConnectionListenerAdapter;
35  
36  /**
37   * an XID represents a transaction branch. A transaction branch represents the
38   * context a rollback/commit affects.
39   * <p/>
40   * A transactional branch corresponds to an physical connection
41   * <p/>
42   * Created by Christoph Schmidt-Casdorff on 10.02.14.
43   */
44  class XATransactionalBranch<C extends IPhynixxConnection> extends
45  		PhynixxManagedConnectionListenerAdapter<C> implements
46  		IPhynixxManagedConnectionListener<C> {
47  
48  	private static final IPhynixxLogger LOG = PhynixxLogManager
49  			.getLogger(XATransactionalBranch.class);
50  
51  	private XAResourceProgressState progressState = XAResourceProgressState.ACTIVE;
52  
53  	private boolean readOnly = true;
54  
55  	private XAResourceActivationState activeState = XAResourceActivationState.ACTIVE;
56  
57  	private volatile boolean rollbackOnly = false;
58  
59  	private int heuristicState = 0;
60  
61  	private Xid xid;
62  
63  	private IPhynixxManagedConnection<C> managedConnection;
64  
65  	XATransactionalBranch(Xid xid,
66  			IPhynixxManagedConnection<C> managedConnection) {
67  		this.xid = xid;
68  		this.managedConnection = managedConnection;
69  		this.managedConnection.addConnectionListener(this);
70  	}
71  
72  	Xid getXid() {
73  		return xid;
74  	}
75  
76  	IPhynixxManagedConnection<C> getManagedConnection() {
77  		return managedConnection;
78  	}
79  
80  	@Override
81  	public boolean equals(Object o) {
82  		if (this == o)
83  			return true;
84  		if (o == null || getClass() != o.getClass())
85  			return false;
86  
87  		XATransactionalBranch that = (XATransactionalBranch) o;
88  
89  		if (!xid.equals(that.xid))
90  			return false;
91  
92  		return true;
93  	}
94  
95  	@Override
96  	public int hashCode() {
97  		return xid.hashCode();
98  	}
99  
100 	void suspend() {
101 		if (this.activeState == XAResourceActivationState.SUSPENDED) {
102 			return;
103 		}
104 		if (this.progressState != XAResourceProgressState.ACTIVE) {
105 			throw new IllegalStateException(
106 					"A already prepared or preparing TX can not be suspended");
107 
108 		}
109 		this.activeState = XAResourceActivationState.ACTIVE;
110 	}
111 
112 	void resume() {
113 		if (this.activeState == XAResourceActivationState.SUSPENDED) {
114 			return;
115 		}
116 		this.activeState = XAResourceActivationState.ACTIVE;
117 	}
118 
119 	public boolean isInActive() {
120 		return this.getActiveState() != XAResourceActivationState.ACTIVE;
121 	}
122 
123 	public boolean isXAProtocolFinished() {
124 		return !this.isProgressStateIn(XAResourceProgressState.ACTIVE);
125 	}
126 
127 	boolean isSuspended() {
128 		return this.getActiveState() == XAResourceActivationState.SUSPENDED;
129 	}
130 
131 	boolean isReadOnly() {
132 		return readOnly;
133 	}
134 
135 	void setReadOnly(boolean readOnly) {
136 		this.readOnly = readOnly;
137 	}
138 
139 	private XAResourceActivationState getActiveState() {
140 		return activeState;
141 	}
142 
143 	void setRollbackOnly(boolean rbOnly) {
144 		this.rollbackOnly = rbOnly;
145 	}
146 
147 	boolean isRollbackOnly() {
148 		// new Exception("Read " +
149 		// this.rollbackOnly+" ObjectId"+super.toString()).printStackTrace();
150 		return rollbackOnly;
151 	}
152 
153 	XAResourceProgressState getProgressState() {
154 		return progressState;
155 	}
156 
157 	private void setProgressState(XAResourceProgressState progressState) {
158 		this.progressState = progressState;
159 	}
160 
161 	boolean hasHeuristicOutcome() {
162 		return (this.heuristicState > 0);
163 	}
164 
165 	int getHeuristicState() {
166 		return this.heuristicState;
167 	}
168 
169 	/**
170 	 * maintains the state of the transactional branch if a commit is executed.
171 	 * <p/>
172 	 * The commit is executed
173 	 *
174 	 * @param onePhase
175 	 * @throws XAException
176 	 */
177 	void commit(boolean onePhase) throws XAException {
178 		if (this.getProgressState() == XAResourceProgressState.COMMITTED) {
179 			return;
180 		}
181 
182 		if (this.hasHeuristicOutcome()) {
183 			throw new XAException(this.heuristicState);
184 		}
185 		if (!checkTXRequirements()) {
186 			throw new SampleTransactionalException("State not transactional "
187 					+ this);
188 		}
189 
190 		this.checkRollback();
191 
192 		if (this.getProgressState() != XAResourceProgressState.ACTIVE
193 				&& onePhase) {
194 			throw new XAException(XAException.XAER_RMFAIL);
195 		} else if (this.getProgressState() != XAResourceProgressState.PREPARED
196 				&& !onePhase) {
197 			throw new XAException(XAException.XAER_RMFAIL);
198 		} else if (this.getProgressState() != XAResourceProgressState.PREPARED
199 				&& this.getProgressState() != XAResourceProgressState.ACTIVE) {
200 			throw new XAException(XAException.XAER_RMFAIL);
201 		}
202 
203 		this.setProgressState(XAResourceProgressState.COMMITTING);
204 		try {
205 			this.getManagedConnection().commit(onePhase);
206 		} catch (Exception e) {
207 			throwXAException(XAException.XAER_PROTO, e);
208 		}
209 
210 		this.setProgressState(XAResourceProgressState.COMMITTED);
211 
212 	}
213 
214 	private void checkRollback() throws XAException {
215 		if (!this.getManagedConnection().hasCoreConnection()
216 				|| this.getManagedConnection().isClosed()) {
217 			this.setProgressState(XAResourceProgressState.ROLLING_BACK);
218 			throw new XAException(XAException.XA_RBROLLBACK);
219 		}
220 
221 		if (this.isRollbackOnly()) {
222 			this.setProgressState(XAResourceProgressState.ROLLING_BACK);
223 			throw new XAException(XAException.XA_RBROLLBACK);
224 		}
225 
226 		if (LOG.isInfoEnabled()) {
227 			LOG.info("RollbackOnly " + this.rollbackOnly);
228 		}
229 	}
230 
231 	int prepare() throws XAException {
232 		// must find connection for this transaction
233 		if (!this.isProgressStateIn(XAResourceProgressState.ACTIVE)) {// must
234 																		// have
235 																		// had
236 																		// start()
237 																		// called
238 			LOG.error("XAResource " + this + " must have start() called ");
239 			throw new XAException(XAException.XAER_PROTO);
240 		}
241 
242 		if (this.progressState == XAResourceProgressState.PREPARED) {
243 			if (this.isReadOnly()) {
244 				return (XAResource.XA_RDONLY);
245 			} else {
246 				return XAResource.XA_OK;
247 			}
248 		}
249 
250 		// check if resource is READONLY -> no prepare is required
251 		if (this.isReadOnly()) {
252 			this.setProgressState(XAResourceProgressState.PREPARED);
253 			return (XAResource.XA_RDONLY);
254 		}
255 
256 		if (this.progressState == XAResourceProgressState.PREPARING) {
257 			throw new IllegalStateException("XAResource already preparing");
258 		}
259 		if (!checkTXRequirements()) {
260 			throw new SampleTransactionalException("State not transactional "
261 					+ this);
262 		}
263 
264 		if (this.hasHeuristicOutcome()) {
265 			throw new XAException(this.heuristicState);
266 		}
267 		this.checkRollback();
268 		this.setProgressState(XAResourceProgressState.PREPARING);
269 		try {
270 			this.getManagedConnection().prepare();
271 		} catch (Exception e) {
272 			throwXAException(XAException.XAER_PROTO, e);
273 		}
274 		this.setProgressState(XAResourceProgressState.PREPARED);
275 
276 		// anything to commit?
277 		if (this.isReadOnly()) {
278 			return (XAResource.XA_RDONLY);
279 		} else {
280 			return XAResource.XA_OK;
281 		}
282 	}
283 
284 	private boolean isProgressStateIn(XAResourceProgressState... states) {
285 		if (states == null || states.length == 0) {
286 			return false;
287 		}
288 		XAResourceProgressState currentStates = this.getProgressState();
289 		for (int i = 0; i < states.length; i++) {
290 			if (states[i] == currentStates) {
291 				return true;
292 			}
293 		}
294 		return false;
295 	}
296 
297 	void rollback() throws XAException {
298 		if (!checkTXRequirements()) {
299 			LOG.error("State not transactional " + this);
300 			new XAException(XAException.XAER_PROTO);
301 		}
302 
303 		LOG.debug("XATransactionalBranch:rollback for xid=" + xid
304 				+ " current Status="
305 				+ ConstantsPrinter.getStatusMessage(this.getProgressState()));
306 		switch (this.getProgressState()) {
307 		case PREPARED: // ready to do rollback
308 		case ACTIVE:
309 			try {
310 				LOG.debug("XATransactionalBranch:rollback try to perform the rollback operation");
311 				this.setProgressState(XAResourceProgressState.ROLLING_BACK);
312 
313 				// do the rollback
314 				if (this.getManagedConnection() != null
315 						&& !this.getManagedConnection().isClosed()) {
316 					try {
317 						this.getManagedConnection().rollback();
318 					} catch (Exception e) {
319 						throwXAException(XAException.XAER_PROTO, e);
320 					}
321 				} else {
322 					LOG.info("XATransactionalBranch connection already closed -> no rollback");
323 				}
324 
325 				this.setProgressState(XAResourceProgressState.ROLLEDBACK);
326 				// perform the rollback operation
327 				LOG.debug("XATransactionalBranch:rollback performed the rollback");
328 			} catch (Exception e) {
329 				LOG.error("Error in " + this + " \n" + e.getMessage());
330 				throwXAException(XAException.XAER_PROTO, e);
331 				// rollback will have been performed
332 			}
333 			break;
334 		default:
335 			LOG.error("XATransactionalBranch : rollback not permitted on TX state "
336 					+ ConstantsPrinter.getStatusMessage(this.getProgressState())
337 					+ " XAResoureTXState=" + this);
338 			throw new XAException(XAException.XAER_RMFAIL);
339 		}
340 
341 	}
342 
343 	private void throwXAException(int code, Throwable th) throws XAException {
344 		XAException xaException = new XAException(code);
345 		xaException.initCause(th);
346 	}
347 
348 	/**
349 	 * checks, if the current state complies with the requirements of an active
350 	 * TX
351 	 *
352 	 * @return
353 	 */
354 	private boolean checkTXRequirements() {
355 		if (this.getManagedConnection() == null) {
356 			return false;
357 		}
358 		if (isInActive()) {
359 			return false;
360 		}
361 
362 		return true;
363 	}
364 
365 	public void close() {
366 		this.getManagedConnection().close();
367 		this.setProgressState(XAResourceProgressState.CLOSED);
368 	}
369 
370 	@Override
371 	public void connectionRequiresTransaction(IManagedConnectionEvent<C> event) {
372 		this.setReadOnly(false);
373 	}
374 
375 	@Override
376 	public void connectionReleased(IManagedConnectionEvent<C> event) {
377 		event.getManagedConnection().removeConnectionListener(this);
378 	}
379 
380 	@Override
381 	public void connectionFreed(IManagedConnectionEvent<C> event) {
382 		event.getManagedConnection().removeConnectionListener(this);
383 	}
384 
385 }