View Javadoc

1   package org.csc.phynixx.connection.loggersystem;
2   
3   /*
4    * #%L
5    * phynixx-common
6    * %%
7    * Copyright (C) 2014 csc
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  
24  import org.csc.phynixx.common.exceptions.DelegatedRuntimeException;
25  import org.csc.phynixx.common.io.LogRecordWriter;
26  import org.csc.phynixx.connection.*;
27  import org.csc.phynixx.loggersystem.logger.IDataLoggerFactory;
28  import org.csc.phynixx.loggersystem.logrecord.*;
29  
30  import java.io.IOException;
31  import java.util.ArrayList;
32  import java.util.Iterator;
33  import java.util.List;
34  import java.util.Set;
35  
36  
37  /**
38   * this listener observes the lifecycle of a connection and associates a xaDataRecorder if necessary.
39   */
40  public class LoggerPerTransactionStrategy<C extends IPhynixxConnection & IXADataRecorderAware> extends PhynixxManagedConnectionListenerAdapter<C> implements IPhynixxLoggerSystemStrategy<C>, IPhynixxManagedConnectionListener<C> {
41  
42  
43      private IXARecorderRepository xaRecorderResource;
44  
45      /**
46       * the logger is added to all instanciated Loggers
47       */
48      public void addLoggerListener(IXARecorderResourceListener listener) {
49          // this.xaRecorderResource.addListener(listener);
50  
51      }
52  
53  
54      /**
55       * per thread a new Logger cpould be instanciated with aid of the loggerFacrory
56       *
57       * @param loggerFactory
58       * @throws Exception
59       */
60      public LoggerPerTransactionStrategy(IDataLoggerFactory loggerFactory) {
61          this.xaRecorderResource = new PhynixxXARecorderRepository(loggerFactory);
62      }
63  
64  
65      @Override
66      public void close() {
67          this.xaRecorderResource.close();
68      }
69  
70  
71      @Override
72      public void connectionRecovering(IManagedConnectionEvent<C> event) {
73          this.connectionRequiresTransaction(event);
74      }
75  
76  
77       /**
78       * Logger isn't close. If a dataRecorder is found in this phase this indicates an abnormal program flow.,
79       * <p/>
80       * Therefore the dataRecorder isn't close and keep it's content to possibly recover
81       *
82       */
83      @Override
84      public void connectionReleased(IManagedConnectionEvent<C> event) {
85  
86          // physical connection is already set free
87          if(  !event.getManagedConnection().hasCoreConnection()) {
88                  return;
89          }
90  
91          C con = event.getManagedConnection().getCoreConnection();
92          if (con == null || !(con instanceof IXADataRecorderAware)) {
93              return;
94          }
95  
96          IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
97          // Transaction is closed and the xaDataRecorder is destroyed ...
98          IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
99          if (xaDataRecorder == null) {
100             return;
101         }
102 
103         // if commit/rollback was performed, nothing happened. If no the logged data is closed but not destroy. So recovery can happen
104         xaDataRecorder.close();
105 
106     }
107 
108     /**
109      * Logger will be closed. If a dataRecorder has remaining transactional data an abnormal prgram flow is detected an
110      * the data of the logger is not destroy but kept to further recovery
111      *
112      * @param event current connection
113      */
114 
115     @Override
116     public void connectionFreed(IManagedConnectionEvent<C> event) {
117         // physical connection is already set free
118         if( !event.getManagedConnection().hasCoreConnection()) {
119             return;
120         }
121         C con = event.getManagedConnection().getCoreConnection();
122         if (con == null || !(con instanceof IXADataRecorderAware)) {
123             return;
124         }
125 
126         IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
127         // Transaction is closed and the xaDataRecorder is destroyed ...
128         IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
129         if (xaDataRecorder == null) {
130             return;
131         }
132 
133         // if commit/rollback was performed, nothing happend. If no the logged data is closed but not destroy. So recovery can happen
134 
135         if( event.getManagedConnection().hasTransactionalData()) {
136             xaRecorderResource.close(); // close without removing the revoer data
137         } else {
138             xaDataRecorder.destroy();
139         }
140         messageAwareConnection.setXADataRecorder(null);
141 
142     }
143 
144 
145 
146     /**
147      * destroys the datalogger
148      */
149     @Override
150     public void connectionRecovered(IManagedConnectionEvent<C> event) {
151         // physical connection is already set free
152         if( !event.getManagedConnection().hasCoreConnection()) {
153             return;
154         }
155         IPhynixxConnection con = event.getManagedConnection().getCoreConnection();
156         if (con == null || !(con instanceof IXADataRecorderAware)) {
157             return;
158         }
159 
160         IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
161 
162 
163         // Transaction is close and the logger is destroyed ...
164         IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
165         if (xaDataRecorder == null) {
166             return;
167         }
168         // it's my logger ....
169 
170         // the logger has to be destroyed ...
171         else {
172             xaDataRecorder.destroy();
173             messageAwareConnection.setXADataRecorder(null);
174         }
175 
176     }
177 
178     @Override
179     public void connectionRolledback(IManagedConnectionEvent<C> event) {
180         IPhynixxConnection con = event.getManagedConnection().getCoreConnection();
181         if (con == null || !(con instanceof IXADataRecorderAware)) {
182             return;
183         }
184 
185         IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
186 
187         // Transaction is closed and the logger is destroyed ...
188         IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
189         if (xaDataRecorder == null) {
190             return;
191         }
192 
193         // if the rollback is completed the rollback data isn't needed
194         xaDataRecorder.reset();
195         messageAwareConnection.setXADataRecorder(null);
196 
197         event.getManagedConnection().removeConnectionListener(this);
198     }
199 
200 
201     @Override
202     public void connectionCommitted(IManagedConnectionCommitEvent<C> event) {
203         IPhynixxConnection con = event.getManagedConnection().getCoreConnection();
204         if (con == null || !(con instanceof IXADataRecorderAware)) {
205             return;
206         }
207 
208         IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
209 
210 
211         // Transaction is close and the logger is destroyed ...
212         IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
213         if (xaDataRecorder == null) {
214             return;
215         }
216         xaDataRecorder.reset();
217         messageAwareConnection.setXADataRecorder(null);
218 
219     }
220 
221 
222     /**
223      * start sequence writes the ID of the XADataLogger to identify the content of the logger
224      *
225      * @param dataRecorder DataRecorder that uses /operates on the current physical logger
226      */
227     private void writeStartSequence(IXADataRecorder dataRecorder) throws IOException, InterruptedException {
228 
229         LogRecordWriter writer= new LogRecordWriter();
230         writer.writeLong(dataRecorder.getXADataRecorderId());
231         dataRecorder.writeRollbackData(writer.toByteArray());
232     }
233 
234     @Override
235     public void connectionRequiresTransaction(IManagedConnectionEvent<C> event) {
236         IPhynixxConnection con = event.getManagedConnection().getCoreConnection();
237         if (con == null || !(con instanceof IXADataRecorderAware)) {
238             return;
239         }
240 
241         IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
242 
243 
244         // Transaction is close and the logger is destroyed ...
245         IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
246         // it's my logger ....
247 
248         // Transaction is closed and the logger is destroyed ...
249         if (xaDataRecorder != null && xaDataRecorder.isClosed()) {
250             xaDataRecorder = null;  // gonna be refreshed
251             xaDataRecorder.close();
252         }
253 
254 
255         // refresh the datarecorder , if
256         if (xaDataRecorder == null) {
257             try {
258                 IXADataRecorder xaLogger = this.xaRecorderResource.createXADataRecorder();
259                 messageAwareConnection.setXADataRecorder(xaLogger);
260             } catch (Exception e) {
261                 // retry ...
262                 try {
263                     Thread.currentThread().sleep(1000);
264                 } catch (InterruptedException e1) {
265                 }
266                 try {
267                     IXADataRecorder xaLogger = this.xaRecorderResource.createXADataRecorder();
268                     messageAwareConnection.setXADataRecorder(xaLogger);
269                 } catch (Exception ee) {
270                     throw new DelegatedRuntimeException("creating new Logger for " + con, ee);
271                 }
272             }
273         }
274         event.getManagedConnection().addConnectionListener(this);
275 
276     }
277 
278 
279     /**
280      * recovers all incomplete dataRecorders {@link org.csc.phynixx.loggersystem.logrecord.IXADataRecorder#isCompleted()} and destroys all complete dataRecorders
281      *
282      * @return incomplete dataRecorders
283      */
284 
285     @Override
286     public List<IXADataRecorder> readIncompleteTransactions() {
287         List<IXADataRecorder> messageSequences = new ArrayList<IXADataRecorder>();
288         // recover all loggers ....
289         try {
290 
291             this.xaRecorderResource.recover();
292             Set<IXADataRecorder> xaDataRecorders = this.xaRecorderResource.getXADataRecorders();
293 
294             for (Iterator<IXADataRecorder> iterator = xaDataRecorders.iterator(); iterator.hasNext(); ) {
295                 IXADataRecorder dataRecorder = iterator.next();
296 
297                 if (!dataRecorder.isEmpty() ) {
298                     messageSequences.add(dataRecorder);
299                 } else {
300                     dataRecorder.destroy();
301                 }
302 
303             }
304             return messageSequences;
305         } catch (Exception e) {
306             throw new DelegatedRuntimeException(e);
307         }
308     }
309 
310 
311     @Override
312     public IPhynixxManagedConnection<C> decorate(IPhynixxManagedConnection<C> managedConnection) {
313         managedConnection.addConnectionListener(this);
314         return managedConnection;
315     }
316 
317 }