View Javadoc

1   package org.csc.phynixx.connection;
2   
3   /*
4    * #%L
5    * phynixx-connection
6    * %%
7    * Copyright (C) 2014 csc
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  
24  import org.csc.phynixx.common.cast.ImplementorUtils;
25  import org.csc.phynixx.common.logger.IPhynixxLogger;
26  import org.csc.phynixx.common.logger.PhynixxLogManager;
27  import org.csc.phynixx.loggersystem.logrecord.IDataRecordReplay;
28  import org.csc.phynixx.loggersystem.logrecord.IXADataRecorder;
29  
30  import java.util.ArrayList;
31  import java.util.List;
32  
33  
34  /**
35   * decorates a connection an takes care of calling the listener during the different phases of a transaction
36   * <p/>
37   * <pre>
38   *
39   *     Event
40   *
41   *     requiresTransaction - the connection is to be changed in a transaction an needs needs support for managing transactional data
42   *
43   *     rollingBack - roll back starts
44   *     rolledBack  - rollback is finished
45   *     preparing  - prepare starts
46   *     prepared   - prepare is finished
47   *     committing  - commit starts
48   *     committed   - commit is finished
49   *
50   *     connection reference
51   *     connection dereferenced
52   *
53   *     connection closed
54   *
55   *
56   * </pre>
57   *
58   * @param <C> Typ of the connection
59   */
60  
61  
62  /**
63   * guards all calls to methods of {@link org.csc.phynixx.connection.IPhynixxConnection} and ensures that the correct events are deliverd to the listeners.
64   * This class has the character of an abstract class and should no be instanciated.
65   *
66   * @param <C>
67   */
68  abstract class PhynixxManagedConnectionGuard<C extends IPhynixxConnection> implements IPhynixxManagedConnection<C>, IXADataRecorderAware, ICloseable , IAutoCommitAware{
69  
70      private static final IPhynixxLogger LOG = PhynixxLogManager.getLogger(PhynixxManagedConnectionGuard.class);
71  
72      private C connection = null;
73  
74      private Class<C> connectionInterface;
75  
76      // indicates, that the core connection is transactionalData
77      private volatile boolean transactionalData = false;
78  
79      private volatile boolean synchronizedConnection=true;
80  
81      private volatile boolean closed = false;
82  
83      private CloseStrategy closeStrategy;
84  
85      private List<IPhynixxManagedConnectionListener<C>> listeners = new ArrayList<IPhynixxManagedConnectionListener<C>>();
86  
87  
88      final Long id;
89  
90      private volatile Long boundThreadId =null;
91  
92      protected PhynixxManagedConnectionGuard(long id, Class<C> connectionInterface, C connection, CloseStrategy<C> closeStrategy) {
93          this.id = id;
94          this.connectionInterface = connectionInterface;
95          this.setConnection(connection);
96          this.closeStrategy=closeStrategy;
97          this.bindToCurrentThread();
98      }
99  
100     private void bindToCurrentThread() {
101         this.boundThreadId= Thread.currentThread().getId();
102     }
103     
104     private void releaseThreadBinding() {
105         this.boundThreadId=null; 
106     }
107     
108     private void checkThreadBinding() {
109         long currentThreadBinding = Thread.currentThread().getId();
110         if( this.boundThreadId!=null && currentThreadBinding!=this.boundThreadId) {
111             throw new IllegalStateException("Connection is bound to Thread "+this.boundThreadId+" but called by Thread "+ currentThreadBinding);
112         }
113     }
114 
115     @Override
116     public boolean equals(Object o) {
117         if (this == o) return true;
118         if (o == null || !(o instanceof IPhynixxManagedConnection)) return false;
119 
120         IPhynixxManagedConnection that = (IPhynixxManagedConnection) o;
121         return that.getManagedConnectionId() == this.getManagedConnectionId();
122     }
123 
124     @Override
125     public int hashCode() {
126         return id.hashCode();
127     }
128 
129     @Override
130     public void setSynchronized(boolean state) {
131         this.synchronizedConnection=state;
132     }
133 
134     @Override
135     public boolean isSynchronized() {
136         return this.synchronizedConnection;
137     }
138 
139     @Override
140     public long getManagedConnectionId() {
141         return id;
142     }
143 
144     public String toString() {
145         if (hasCoreConnection()) {
146             return this.getCoreConnection().toString();
147         }
148         return "no core connection";
149     }
150 
151     @Override
152     public boolean hasTransactionalData() {
153         return transactionalData;
154     }
155 
156     public boolean isClosed() {
157         return closed;
158     }
159 
160 
161     void setClosed(boolean closed) {
162         this.closed = closed;
163         this.transactionalData=false;
164     }
165 
166     @Override
167     public void reopen() {
168 
169         if(connection==null) {
170             throw new IllegalStateException("Connection is already set free");
171         }
172         if(this.hasTransactionalData()) {
173             LOG.warn("Connection " + this + " has tranactional data and has to be closed safely");
174         }
175         this.setClosed(false);
176         this.reset();
177         this.bindToCurrentThread();
178     }
179 
180     /**
181      * If this proxy is implemented by a DynProxy. If a referene to this object has to be propagated
182      * ({@link #fireEvent(PhynixxManagedConnectionGuard.IEventDeliver)}, it would leads to invalid references
183      * if <code>this</code> is returned but not the implementing DynaProxy.
184      * <p/>
185      * See the implementation of a java proxy.
186      *
187      * @return he object via <code>this</code> is accessible
188      * @see org.csc.phynixx.connection.DynaPhynixxManagedConnectionFactory.ConnectionPhynixxGuard
189      */
190     abstract protected IPhynixxManagedConnection<C> getObservableProxy();
191 
192     @Override
193     public C toConnection() {
194         return ImplementorUtils.cast(this.getObservableProxy(), this.connectionInterface);
195     }
196 
197     private void setConnection(C con) {
198         if ((this.connection == null && con == null) ||
199                 (this.connection != null && this.connection.equals(con))  ) {
200             return;
201         }
202         this.connection = con;
203     }
204 
205 
206     public IXADataRecorder getXADataRecorder() {
207         if (hasCoreConnection() && ImplementorUtils.isImplementationOf(getCoreConnection(), IXADataRecorderAware.class)) {
208             this.checkThreadBinding();
209             return ImplementorUtils.cast(getCoreConnection(), IXADataRecorderAware.class).getXADataRecorder();
210         }
211         return null;
212 
213     }
214 
215     @Override
216     public IDataRecordReplay recoverReplayListener() {
217         if (hasCoreConnection() && ImplementorUtils.isImplementationOf(getCoreConnection(), IXADataRecorderAware.class)) {
218             return ImplementorUtils.cast(getCoreConnection(), IXADataRecorderAware.class).recoverReplayListener();
219         }
220         return null;
221     }
222 
223     public void setXADataRecorder(IXADataRecorder dataRecorder) {
224         if (hasCoreConnection() && ImplementorUtils.isImplementationOf(getCoreConnection(), IXADataRecorderAware.class)) {
225             ImplementorUtils.cast(getCoreConnection(), IXADataRecorderAware.class).setXADataRecorder(dataRecorder);
226         }
227     }
228 
229     @Override
230     public C getCoreConnection() {
231         if( this.connection==null) {
232             throw new IllegalStateException("Connection is already set free and is invalid");
233         }
234         return this.connection;
235     }
236 
237     /**
238      * The implementation delegates to {@link org.csc.phynixx.connection.CloseStrategy}.* the real action is determined by the implementation of {@link org.csc.phynixx.connection.CloseStrategy}.
239      */
240     @Override
241     public void close() {
242         if (!this.isClosed() && hasCoreConnection()) {
243             this.closeStrategy.close(this);
244         }
245     }
246 
247     /**
248      * marks a connection a freed. This connection won't be used any more.
249      *
250      * The connection is released from the bound thread an can be bind to an other thread
251      *
252      * The thread binding is not checked as this method is called in emergency and has to be robust
253      */
254     @Override
255     public void free() {
256         try {
257         if(hasCoreConnection()) {
258             this.getCoreConnection().close();
259         }
260         this.fireConnectionFreed();
261         this.boundThreadId= Thread.currentThread().getId();
262 
263     } finally {
264         this.setClosed(true);
265         // state may be important for State-Listener, so its set after the listener did their work
266         setTransactionalData(false);
267 
268         // re-opem is not possible
269         // this.connection=null;
270         this.releaseThreadBinding();
271     }
272 
273     }
274 
275     @Override
276     public boolean hasCoreConnection() {
277         return this.connection!=null;
278     }
279 
280 
281     /**
282      * Implementation of releasing the connection from transactional context
283      */
284     public void release() {
285         try {
286             if(hasCoreConnection()) {
287                 checkThreadBinding();
288                 this.setClosed(true);
289                 this.getCoreConnection().reset();
290                 this.fireConnectionReleased();
291 
292             }
293         } finally {
294             this.setClosed(true);
295 
296             // state may be important for Stat-Listener, so its set after the listener did their work
297             setTransactionalData(false);
298 
299             this.releaseThreadBinding();
300         }
301     }
302 
303 
304     public boolean isAutoCommit() {
305         if (hasCoreConnection()) {
306             checkThreadBinding();
307             if(ImplementorUtils.isImplementationOf(this.getCoreConnection(), IAutoCommitAware.class)) {
308                 return ImplementorUtils.cast(this.getCoreConnection(),IAutoCommitAware.class).isAutoCommit();
309             }
310         }
311         return false;
312 
313     }
314 
315     public void setAutoCommit(boolean autoCommit) {
316         if (hasCoreConnection()) {
317             checkThreadBinding();
318             if(ImplementorUtils.isImplementationOf(this.getCoreConnection(), IAutoCommitAware.class)) {
319                 ImplementorUtils.cast(this.getCoreConnection(),IAutoCommitAware.class).setAutoCommit(autoCommit);
320             }
321         }
322     }
323 
324 
325     /**
326      * A xaresource's connection is  never closed but always dereferenced .
327      * As a connection proxy shields a xa resource, the current connection is not closed but dereferenced (==released)
328      */
329     public void reset() {
330         if (hasCoreConnection()) {
331 
332             checkThreadBinding();
333             this.getCoreConnection().reset();
334             setTransactionalData(false);
335             // notify the action
336             this.fireConnectionReset();
337         }
338     }
339 
340     public void commit(boolean onePhaseCommit) {
341 
342         if(!hasTransactionalData()) {
343             return;
344         }
345         fireConnectionCommitting(onePhaseCommit);
346         if (hasCoreConnection()) {
347             checkThreadBinding();
348             if( onePhaseCommit) {
349                 this.getCoreConnection().prepare();
350             }
351             this.getCoreConnection().commit();
352             setTransactionalData(false);
353         }
354         this.fireConnectionCommitted(onePhaseCommit);
355 
356     }
357 
358     public void commit() {
359         this.commit(true);
360 
361     }
362 
363 
364     public void prepare() {
365 
366         if(!hasTransactionalData()) {
367             return;
368         }
369 
370         this.fireConnectionPreparing();
371         if (hasCoreConnection()) {
372             checkThreadBinding();
373             this.getCoreConnection().prepare();
374         }
375         this.fireConnectionPrepared();
376     }
377 
378 
379     public void rollback() {
380 
381         if(!hasTransactionalData()) {
382             return;
383         }
384 
385         fireConnectionRollingBack();
386         if (hasCoreConnection()) {
387             checkThreadBinding();
388             this.getCoreConnection().rollback();
389             setTransactionalData(false);
390 
391         }
392         this.fireConnectionRolledback();
393     }
394 
395     private void setTransactionalData(boolean state) {
396         this.transactionalData=state;
397     }
398 
399 
400     /**
401      * recovers the data of the dataLogger and provides the recovered data to the connection via the replaylistener
402      */
403     @Override
404     public void recover() {
405 
406         // not revoverable
407         if (this.getCoreConnection() == null || !ImplementorUtils.isImplementationOf(getCoreConnection(), IXADataRecorderAware.class)) {
408             return;
409         }
410 
411         IXADataRecorderAware con = ImplementorUtils.cast(getCoreConnection(), IXADataRecorderAware.class);
412 
413         // the connection has to re establish the state of the message LOG
414         IXADataRecorder msgLogger = this.getXADataRecorder();
415         if (msgLogger.isEmpty()) {
416             return;
417         }
418         this.fireConnectionRecovering();
419         IDataRecordReplay dataRecordReplay = con.recoverReplayListener();
420 
421         if( dataRecordReplay==null) {
422             throw new IllegalStateException("IPhynixxConnection.recoverReplayListener() has to provide a IDataRecordReplay to be recovered");
423         }
424 
425         // msgLogger.recover();
426         msgLogger.replayRecords(dataRecordReplay);
427 
428         this.fireConnectionRecovered();
429 
430     }
431 
432 
433     interface IEventDeliver<X extends IPhynixxConnection,Y extends IManagedConnectionEvent<X>> {
434         void fireEvent(IPhynixxManagedConnectionListener<X> listener, Y event);
435     }
436 
437 
438     public void addConnectionListener(IPhynixxManagedConnectionListener<C> listener) {
439         if (!listeners.contains(listener)) {
440         this.listeners.add(listener);
441     }
442     }
443 
444     public synchronized void removeConnectionListener(IPhynixxManagedConnectionListener<C> listener) {
445 
446         this.listeners.remove(listener);
447     }
448 
449     /**
450      *
451      * @param deliver     *
452      * @see #deliverEvent(org.csc.phynixx.connection.PhynixxManagedConnectionGuard.IEventDeliver, IManagedConnectionEvent)
453      *
454      */
455     private  void fireEvent(IEventDeliver<C, IManagedConnectionEvent<C>> deliver) {
456         fireEventWithException(deliver, null);
457     }
458 
459     /**
460      * creates an event an fires at all listeners
461      * @param deliver implementaion of the firing
462      * @param exception expection added to the event if given
463      *
464      * @see #deliverEvent(org.csc.phynixx.connection.PhynixxManagedConnectionGuard.IEventDeliver, IManagedConnectionEvent)
465      */
466     private void fireEventWithException(IEventDeliver<C, IManagedConnectionEvent<C>> deliver, Exception exception) {
467         ManagedPhynixxConnectionEvent<C> event = new ManagedPhynixxConnectionEvent<C>(getObservableProxy(), exception);
468         deliverEvent(deliver, event);
469     }
470 
471 
472     /**
473      *
474      * @param deliver
475      * @param event
476      * @param <E>
477      */
478     private <E extends IManagedConnectionEvent<C>> void deliverEvent(IEventDeliver<C, E> deliver, E event) {
479         // copy all listeners as the callback may change the list of listeners ...
480         List<IPhynixxManagedConnectionListener<C>> tmp = new ArrayList(this.listeners);
481         for (int i = 0; i < tmp.size(); i++) {
482             IPhynixxManagedConnectionListener<C> listener = tmp.get(i);
483             if (LOG.isDebugEnabled()) {
484                 LOG.debug("ConnectionPhynixxGuard " + event + " called listener " + listener + " on " + deliver);
485             }
486             deliver.fireEvent(listener, event);
487         }
488     }
489 
490 
491     protected void fireConnectionReleased() {
492         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
493             public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionEvent<C> event) {
494                 listener.connectionReleased(event);
495             }
496 
497             public String toString() {
498                 return "connectionReleased";
499             }
500         };
501         fireEvent(deliver);
502     }
503 
504     protected void fireConnectionErrorOccurred(final Exception exception) {
505         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
506             public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionEvent<C> event) {
507                 listener.connectionErrorOccurred(event);
508             }
509 
510             public String toString() {
511                 return "connectionErrorOccurred";
512             }
513         };
514         fireEventWithException(deliver, exception);
515     }
516 
517     protected void fireConnectionRequiresTransactionFinished() {
518         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
519             public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionEvent<C> event) {
520                 listener.connectionRequiresTransaction(event);
521             }
522 
523             public String toString() {
524                 return "connectionRequiresTransaction";
525             }
526         };
527         fireEvent(deliver);
528     }
529 
530 
531     protected void fireConnectionRequiresTransaction() {
532 
533         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
534             public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
535                 listener.connectionRequiresTransaction(event);
536             }
537 
538             public String toString() {
539                 return "connectionRequiresTransaction";
540             }
541         };
542         fireEvent(deliver);
543 
544         this.setTransactionalData(true);
545     }
546 
547     protected void fireConnectionRequiresTransactionExecuted(Exception exception) {
548         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
549             public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
550                 listener.connectionRequiresTransactionExecuted(event);
551             }
552 
553             public String toString() {
554                 return "connectionRequiresTransactionExecuted";
555             }
556         };
557         fireEventWithException(deliver, exception);
558     }
559 
560     protected void fireConnectionRequiresTransactionExecuted() {
561         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
562             public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
563                 listener.connectionRequiresTransactionExecuted(event);
564             }
565 
566             public String toString() {
567                 return "connectionRequiresTransactionExecuted";
568             }
569         };
570         fireEvent(deliver);
571     }
572 
573 
574     protected void fireConnectionRolledback() {
575         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
576             public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
577                 listener.connectionRolledback(event);
578             }
579 
580             public String toString() {
581                 return "connectionRolledback";
582             }
583         };
584         fireEvent(deliver);
585     }
586 
587 
588 
589     protected void fireConnectionPreparing() {
590         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
591             public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
592                 listener.connectionPreparing(event);
593             }
594 
595             public String toString() {
596                 return "connectionPreparing";
597             }
598         };
599         fireEvent(deliver);
600     }
601 
602     protected void fireConnectionPrepared() {
603         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
604             public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
605                 listener.connectionPrepared(event);
606             }
607 
608             public String toString() {
609                 return "connectionPrepared";
610             }
611         };
612         fireEvent(deliver);
613     }
614 
615     protected void fireConnectionCommitting(boolean onePhase) {
616         ManagedPhynixxConnectionCommitEvent<C> event = new ManagedPhynixxConnectionCommitEvent<C>(getObservableProxy(), onePhase);
617         IEventDeliver<C, IManagedConnectionCommitEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionCommitEvent<C>>() {
618             public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionCommitEvent<C> event) {
619                 listener.connectionCommitting(event);
620             }
621 
622             public String toString() {
623                 return "connectionCommitting";
624             }
625         };
626         deliverEvent(deliver, event);
627     }
628 
629     protected void fireConnectionCommitted(boolean onePhase) {
630         ManagedPhynixxConnectionCommitEvent<C> event = new ManagedPhynixxConnectionCommitEvent<C>(getObservableProxy(), onePhase);
631         IEventDeliver<C, IManagedConnectionCommitEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionCommitEvent<C>>() {
632             public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionCommitEvent<C> event) {
633                 listener.connectionCommitted(event);
634             }
635 
636             public String toString() {
637                 return "connectionCommitted";
638             }
639         };
640         deliverEvent(deliver, event);
641     }
642 
643 
644     protected void fireConnectionFreed() {
645         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
646             public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
647                 listener.connectionFreed(event);
648             }
649 
650             public String toString() {
651                 return "connectionFreed";
652             }
653         };
654         fireEvent(deliver);
655     }
656 
657 
658 
659     protected void fireConnectionRecovering() {
660         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
661             public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
662                 listener.connectionRecovering(event);
663             }
664 
665             public String toString() {
666                 return "connectionRecovering";
667             }
668         };
669         fireEvent(deliver);
670     }
671 
672     protected void fireConnectionRollingBack() {
673         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
674             public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
675                 listener.connectionRollingBack(event);
676             }
677 
678             public String toString() {
679                 return "connectionRollingBack";
680             }
681         };
682         fireEvent(deliver);
683     }
684 
685     protected void fireConnectionRecovered() {
686         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
687             public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
688                 listener.connectionRecovered(event);
689             }
690 
691             public String toString() {
692                 return "connectionRecovered";
693             }
694         };
695         fireEvent(deliver);
696     }
697     private void fireConnectionReset() {
698 
699         IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C,IManagedConnectionEvent<C>>() {
700             public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionEvent<C> event) {
701                 listener.connectionReset(event);
702             }
703 
704             public String toString() {
705                 return "connectionRest";
706             }
707         };
708         fireEvent(deliver);
709     }
710 
711 }