View Javadoc

1   package org.csc.phynixx.loggersystem.logrecord;
2   
3   /*
4    * #%L
5    * phynixx-common
6    * %%
7    * Copyright (C) 2014 csc
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  
24  import org.csc.phynixx.common.exceptions.DelegatedRuntimeException;
25  
26  import java.io.IOException;
27  import java.util.ArrayList;
28  import java.util.Iterator;
29  import java.util.List;
30  
31  /**
32   * thisclas is backwards bound to the PhynixxXARecorderRepository. The current class manages LogEntries but not know the persistence logger.
33   */
34  public class PhynixxXADataRecorder implements IXADataRecorder {
35  
36      private OrdinalGenerator ordinalGenerator = new OrdinalGenerator();
37  
38      private long messageSequenceId = -1;
39  
40      private List<IDataRecord> messages = new ArrayList<IDataRecord>();
41  
42      private XADataLogger dataLogger;
43  
44  
45      private transient boolean committing = false;
46  
47      private transient boolean completed = false;
48  
49      private transient boolean prepared = false;
50  
51      private IXADataRecorderLifecycleListener dataRecorderLifycycleListner;
52  
53  
54      /**
55       * opens an Recorder for read. If no recorder with the given ID exists recorder with no data is returned
56       *
57       * @param xaDataLogger Strategy to persist the records
58       * @return
59       * @throws IOException
60       * @throws InterruptedException
61       */
62      static PhynixxXADataRecorder recoverDataRecorder(XADataLogger xaDataLogger, IXADataRecorderLifecycleListener dataRecorderLifycycleListner) {
63          try {
64              PhynixxXADataRecorder dataRecorder = new PhynixxXADataRecorder(-1, xaDataLogger, dataRecorderLifycycleListner);
65              dataRecorder.recover();
66              return dataRecorder;
67          } catch (Exception e) {
68              throw new DelegatedRuntimeException(e);
69          }
70  
71      }
72  
73  
74      /**
75       * recovers the dataRecorder
76       * all messages are removed and all the messsages of the logger are recoverd
77       */
78      @Override
79      public void recover() {
80          try {
81              this.messages.clear();
82              this.dataLogger.prepareForRead(this);
83              this.dataLogger.recover(this);
84          } catch (Exception e) {
85              throw new DelegatedRuntimeException(e);
86          }
87      }
88  
89      /**
90       * opens an Recorder for read. If no recorder with the given ID exists recorder with no data is returned
91       *
92       * @param messageSequenceId
93       * @param xaDataLogger
94       * @return
95       * @throws IOException
96       * @throws InterruptedException
97       */
98      static PhynixxXADataRecorder openRecorderForWrite(long messageSequenceId, XADataLogger xaDataLogger, IXADataRecorderLifecycleListener dataRecorderLifycycleListner) {
99          PhynixxXADataRecorder dataRecorder = new PhynixxXADataRecorder(messageSequenceId, xaDataLogger, dataRecorderLifycycleListner);
100         try {
101             dataRecorder.dataLogger.prepareForWrite(dataRecorder);
102             return dataRecorder;
103         } catch (Exception e) {
104             throw new DelegatedRuntimeException(e);
105         }
106     }
107 
108     /**
109      * entweder wird dem Recorder explizit ein Logger zugeordnet, poder via recyvcling aus Logger ...
110      *
111      * @param messageSequenceId
112      * @param xaDataLogger
113      */
114     private PhynixxXADataRecorder(long messageSequenceId, XADataLogger xaDataLogger, IXADataRecorderLifecycleListener dataRecorderLifycycleListner) {
115         this.messageSequenceId = messageSequenceId;
116         this.dataLogger = xaDataLogger;
117         this.dataRecorderLifycycleListner = dataRecorderLifycycleListner;
118         if (dataRecorderLifycycleListner != null) {
119             this.dataRecorderLifycycleListner.recorderDataRecorderOpened(this);
120         }
121     }
122 
123     /* (non-Javadoc)
124      * @see de.csc.xaresource.sample.loggersystem.ILogMessageSequence#getMessages()
125      */
126     public List<IDataRecord> getDataRecords() {
127         return messages;
128     }
129 
130 
131     public boolean isCommitting() {
132         return committing;
133     }
134 
135     public boolean isCompleted() {
136         return completed;
137     }
138 
139     public boolean isPrepared() {
140         return prepared;
141     }
142 
143     @Override
144     public boolean isEmpty() {
145         return this.messages.isEmpty();
146     }
147 
148     /**
149      * create a new Message with the given data
150      */
151     public void writeRollbackData(byte[] data) {
152         this.writeRollbackData(toBytesBytes(data));
153     }
154 
155 
156     /**
157      * create a new Message with the given data
158      */
159     public  void writeRollbackData(byte[][] data) {
160        this.createDataRecord(XALogRecordType.ROLLBACK_DATA, data);
161     }
162 
163 
164     public void writeRollforwardData(byte[] data) {
165         this.writeRollforwardData(toBytesBytes(data));
166     }
167 
168     public void writeRollforwardData(byte[][] data) {
169        this.createDataRecord(XALogRecordType.ROLLFORWARD_DATA, data);
170     }
171 
172     public void addMessage(IDataRecord message) {
173         this.establishState(message);
174         this.messages.add(message);
175     }
176 
177     public void replayRecords(IDataRecordReplay replay) {
178 
179         if (this.messages == null || this.messages.size() == 0) {
180             return;
181         }
182 
183         for (int i = 0; i < messages.size(); i++) {
184             IDataRecord msg = this.messages.get(i);
185             if (msg.getLogRecordType().equals(XALogRecordType.ROLLBACK_DATA)) {
186                 if (!this.isCompleted() && !this.isCommitting()) {
187                     replay.replayRollback(msg);
188                 }
189             } else if (msg.getLogRecordType().equals(XALogRecordType.ROLLFORWARD_DATA)) {
190                 if (this.isCommitting()) {
191                     replay.replayRollforward(msg);
192                 }
193             }
194         }
195 
196         // acknowledge data finished
197         replay.notifyNoMoreData();
198     }
199 
200     @Override
201     public IDataRecord createDataRecord(XALogRecordType logRecordType, byte[] recordData) {
202         return this.createDataRecord(logRecordType, this.toBytesBytes(recordData));
203     }
204 
205     @Override
206     public IDataRecord createDataRecord(XALogRecordType logRecordType, byte[][] recordData) {
207         PhynixxDataRecord msg = new PhynixxDataRecord(this.getXADataRecorderId(), this.ordinalGenerator.generate(), logRecordType, recordData);
208         try {
209             this.dataLogger.writeData(this, msg);
210             this.addMessage(msg);
211 
212             return msg;
213         } catch (Exception e) {
214             throw new DelegatedRuntimeException(e);
215         }
216     }
217 
218     private byte[][] toBytesBytes(byte[] recordData) {
219         byte[][] message = new byte[1][];
220         message[0] = recordData;
221         return message;
222     }
223 
224     public long getMessageSequenceId() {
225         return messageSequenceId;
226     }
227 
228     void setMessageSequenceId(long messageSequenceId) {
229         this.messageSequenceId = messageSequenceId;
230     }
231 
232     private void establishState(IDataRecord msg) {
233         XALogRecordType logRecordType = msg.getLogRecordType();
234 
235         if (this.isCommitting() && !logRecordType.equals(XALogRecordType.XA_DONE)) {
236             if (logRecordType == XALogRecordType.USER) {
237                 throw new IllegalStateException("Sequence in State COMMITTING, only XA_DONE/ROLLFORWARD_DATA are accepted");
238             }
239         }
240 
241         if (this.isCompleted()) {
242             throw new IllegalStateException("Sequence in State COMPLETED, no more data is accepted");
243         }
244 
245         if (logRecordType.equals(XALogRecordType.XA_PREPARED)) {
246             this.committing = false;
247             this.completed = false;
248             this.prepared = true;
249         }
250 
251         if (logRecordType.equals(XALogRecordType.ROLLFORWARD_DATA)) {
252             this.committing = true;
253             this.completed = false;
254             this.prepared = false;
255         }
256         if (logRecordType.equals(XALogRecordType.XA_DONE)) {
257             this.committing = false;
258             this.completed = true;
259             this.prepared = false;
260         }
261     }
262 
263     public int compareTo(Object obj) {
264 
265         if (this == obj)
266             return 1;
267         if (obj == null)
268             return 1;
269 
270         if (getClass() != obj.getClass())
271             return 1;
272 
273         final PhynixxXADataRecorder otherMsg = (PhynixxXADataRecorder) obj;
274 
275         return Long.valueOf(this.getXADataRecorderId() - otherMsg.getXADataRecorderId()).intValue();
276     }
277 
278     public boolean equals(Object obj) {
279         if (this == obj)
280             return true;
281         if (obj == null)
282             return false;
283         if (getClass() != obj.getClass())
284             return false;
285 
286         final PhynixxXADataRecorder other = (PhynixxXADataRecorder) obj;
287         return messageSequenceId == other.messageSequenceId;
288     }
289 
290     /* (non-Javadoc)
291      * @see de.csc.xaresource.sample.loggersystem.ILogMessageSequence#getMessageId()
292      */
293     public long getXADataRecorderId() {
294         return this.messageSequenceId;
295     }
296 
297     public int hashCode() {
298         final int prime = 31;
299         int result = 1;
300         result = prime * result + Long.valueOf(messageSequenceId).intValue();
301         return result;
302     }
303 
304 
305     public String toString() {
306         StringBuffer buffer = new StringBuffer(" { \n");
307         for (Iterator iterator = messages.iterator(); iterator.hasNext(); ) {
308             buffer.append('\t').append(iterator.next()).append('\n');
309         }
310         buffer.append(" }");
311         return buffer.toString();
312 
313     }
314 
315 
316     /**
317      * rewinds the dataLoger to start without removing the persistent content of the rdataLogger
318      * the cached messages are removed but can re re-build by analysing the content of the dataLogger.
319      */
320     void rewind() {
321         this.committing = false;
322         this.completed = false;
323         this.prepared = false;
324         this.messages.clear();
325     }
326 
327 
328     /**rewinds the recorder and resets the dataLogger. Information of the dataLogger is removed
329      *
330      */
331     public void reset() {
332         this.committing = false;
333         this.completed = false;
334         this.prepared = false;
335         this.messages.clear();
336         try {
337             this.dataLogger.prepareForWrite(this);
338         } catch (Exception e) {
339             throw new DelegatedRuntimeException(e);
340         }
341 
342     }
343 
344     /**
345      * closes the current datalogger and rewinds
346      */
347     public void close() {
348         if (dataRecorderLifycycleListner != null) {
349             this.dataRecorderLifycycleListner.recorderDataRecorderClosed(this);
350         }
351         this.dataLogger.close();
352         this.rewind();
353     }
354 
355     @Override
356     public boolean isClosed() {
357         return this.dataLogger.isClosed();
358     }
359 
360     /**
361      * destroyes the current dataLogger
362      */
363     @Override
364     public void destroy() {
365         try {
366             this.dataLogger.destroy();
367         } catch (IOException e) {
368             throw new DelegatedRuntimeException(e);
369         }
370     }
371 
372     public void messageSequenceCreated() {
373         this.dataRecorderLifycycleListner.recorderDataRecorderOpened(this);
374     }
375 
376 
377 }