View Javadoc

1   package org.csc.phynixx.loggersystem.logrecord;
2   
3   /*
4    * #%L
5    * phynixx-logger
6    * %%
7    * Copyright (C) 2014 Christoph Schmidt-Casdorff
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  
24  import org.apache.commons.io.IOUtils;
25  import org.csc.phynixx.common.exceptions.DelegatedRuntimeException;
26  import org.csc.phynixx.common.exceptions.ExceptionUtils;
27  import org.csc.phynixx.common.logger.IPhynixxLogger;
28  import org.csc.phynixx.common.logger.PhynixxLogManager;
29  import org.csc.phynixx.loggersystem.logger.IDataLogger;
30  import org.csc.phynixx.loggersystem.logger.channellogger.AccessMode;
31  
32  import java.io.*;
33  
34  /**
35   * brings IXADataRecorder and dataLoger together. An instance keeps an {@link IDataLogger} representing the physical logging strategy.
36   * Its permitted (but not recommended) to shared a dataLogger . Therefor the dataLogger isn't associated to the dataRecorder but the dataRecorder operates on the current dataLogger.
37   * Created by christoph on 10.01.14.
38   */
39  public class XADataLogger {
40  
41      public boolean isClosed() {
42          return this.dataLogger.isClosed();
43      }
44  
45      public void destroy() throws IOException {
46          this.dataLogger.destroy();
47      }
48  
49      private class RecoverReplayListener implements ILogRecordReplayListener {
50  
51          private int count = 0;
52          private String loggerName;
53  
54          private PhynixxXADataRecorder dataRecorder;
55  
56          private RecoverReplayListener(PhynixxXADataRecorder dataRecorder) {
57              this.dataRecorder = dataRecorder;
58          }
59  
60          public int getCountLogRecords() {
61              return count;
62          }
63  
64  
65          public void onRecord(XALogRecordType recordType, byte[][] fieldData) {
66          if (count == 0) {
67                  dataRecorder.setMessageSequenceId(XADataLogger.this.recoverMessageSequenceId(fieldData[0]));
68              } else{
69                  short typeId = recordType.getType();
70                  switch (typeId) {
71                      case XALogRecordType.XA_START_TYPE:
72                      case XALogRecordType.XA_PREPARED_TYPE:
73                      case XALogRecordType.ROLLFORWARD_DATA_TYPE:
74                      case XALogRecordType.XA_DONE_TYPE:
75                      case XALogRecordType.USER_TYPE:
76                      case XALogRecordType.ROLLBACK_DATA_TYPE:
77                          XADataLogger.this.recoverData(dataRecorder, recordType, fieldData);
78                          break;
79                      default:
80                          LOGGER.error("Unknown LogRecordtype " + recordType);
81                          break;
82                  }
83          }
84  
85           count++;
86          }
87  
88      }
89  
90  
91      private static final IPhynixxLogger LOGGER = PhynixxLogManager.getLogger(XADataLogger.class);
92  
93      private static final int HEADER_SIZE = 8 + 4;
94  
95      private IDataLogger dataLogger;
96  
97  
98      XADataLogger(IDataLogger dataLogger) {
99          this.dataLogger = dataLogger;
100     }
101 
102     /**
103      * prepares the Logger for writing. The current content is removed.
104      *
105      * @param dataRecorder
106      * @throws IOException
107      * @throws InterruptedException
108      */
109     void prepareForWrite(PhynixxXADataRecorder dataRecorder) throws IOException, InterruptedException {
110         this.dataLogger.reopen(AccessMode.WRITE);
111         this.writeStartSequence(dataRecorder);
112     }
113 
114     /**
115      * prepares the Logger for writing.
116      * @param dataRecorder DataRecorder that uses /operates on the current physical logger
117      * @throws IOException
118      * @throws InterruptedException
119      */
120     void prepareForAppend(PhynixxXADataRecorder dataRecorder) throws IOException, InterruptedException {
121         this.dataLogger.reopen(AccessMode.APPEND);
122     }
123 
124     /**
125      * prepares the Logger for writing.
126      *
127      * @param dataRecorder DataRecorder that uses /operates on the current physical logger
128      *
129      * @throws IOException
130      * @throws InterruptedException
131      */
132     void prepareForRead(PhynixxXADataRecorder dataRecorder) throws IOException, InterruptedException {
133         this.dataLogger.reopen(AccessMode.READ);
134     }
135 
136     /**
137      *
138      *
139      * @param dataRecorder DataRecorder that uses /operates on the current physical logger
140      *
141      * @param message message to be written
142      * @throws IOException
143      */
144     void writeData(PhynixxXADataRecorder dataRecorder, IDataRecord message) throws IOException {
145         DataOutputStream io = null;
146         try {
147 
148             ByteArrayOutputStream byteIO = new ByteArrayOutputStream(HEADER_SIZE);
149             io = new DataOutputStream(byteIO);
150 
151             io.writeLong(message.getXADataRecorderId());
152             io.writeInt(message.getOrdinal().intValue());
153             byte[] header = byteIO.toByteArray();
154 
155             byte[][] data = message.getData();
156             byte[][] content = null;
157             if (data == null) {
158                 content = new byte[][]{header};
159             } else {
160                 content = new byte[data.length + 1][];
161                 content[0] = header;
162                 for (int i = 0; i < data.length; i++) {
163                     content[i + 1] = data[i];
164                 }
165             }
166 
167             try {
168                 this.dataLogger.write(message.getLogRecordType().getType(), content);
169             } catch (Exception e) {
170                 throw new DelegatedRuntimeException("writing message " + message + "\n" + ExceptionUtils.getStackTrace(e), e);
171             }
172         } finally {
173             if (io != null) {
174                 io.close();
175             }
176         }
177 
178         // Add the messageSequence to the set og messageSequences ...
179     }
180 
181     /**
182      *
183       *
184      * @param dataRecorder DataRecorder that uses /operates on the current physical logger
185      *
186      * @throws IOException
187      * @throws InterruptedException
188      */
189     void recover(PhynixxXADataRecorder dataRecorder) throws IOException, InterruptedException {
190         RecoverReplayListener listener = new RecoverReplayListener(dataRecorder);
191         dataRecorder.rewind();
192         this.dataLogger.replay(listener);
193         if (LOGGER.isDebugEnabled()) {
194             LOGGER.debug("# Records=" + listener.getCountLogRecords());
195         }
196     }
197 
198     /**
199      *
200       *
201      * @param dataRecorder DataRecorder that uses /operates on the current physical logger
202      *
203      * @param logRecordType
204      * @param fieldData
205      */
206     private void recoverData(PhynixxXADataRecorder dataRecorder, XALogRecordType logRecordType, byte[][] fieldData) {
207         if (LOGGER.isDebugEnabled()) {
208             if (fieldData == null || fieldData.length == 0) {
209                 throw new IllegalArgumentException("Record fields are empty");
210             }
211         }
212         // field 0 is header
213         byte[] headerData = fieldData[0];
214         DataInputStream io = new DataInputStream(new ByteArrayInputStream(headerData));
215         try {
216             long messageSequenceId = io.readLong();
217             int ordinal = io.readInt();
218             byte[][] content = null;
219 
220             if (fieldData.length > 1) {
221                 content = new byte[fieldData.length - 1][];
222                 for (int i = 0; i < fieldData.length - 1; i++) {
223                     content[i] = fieldData[i + 1];
224                 }
225             } else {
226                 content = new byte[][]{};
227             }
228 
229             PhynixxDataRecord msg = new PhynixxDataRecord(dataRecorder.getXADataRecorderId(), ordinal, logRecordType, content);
230             dataRecorder.addMessage(msg);
231 
232 
233         } catch (Exception e) {
234             throw new DelegatedRuntimeException(e);
235         } finally {
236             if (io != null) {
237                 IOUtils.closeQuietly(io);
238             }
239         }
240     }
241 
242     void close() {
243         try {
244             this.dataLogger.close();
245         } catch (Exception e) {
246             throw new DelegatedRuntimeException(e);
247         }
248 
249     }
250 
251     /**
252      * start sequence writes the ID of the XADataLogger to identify the content of the logger
253      *
254      * @param dataRecorder DataRecorder that uses /operates on the current physical logger
255      *
256      *
257      * deprecated
258      */
259 
260     private void writeStartSequence(IXADataRecorder dataRecorder) throws IOException, InterruptedException {
261         ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
262         try {
263             DataOutputStream dos = new DataOutputStream(byteOut);
264             dos.writeLong(dataRecorder.getXADataRecorderId());
265             dos.flush();
266         } finally {
267             if (byteOut != null) {
268                 IOUtils.closeQuietly(byteOut);
269             }
270         }
271 
272         byte[][] startSequence = new byte[1][];
273         startSequence[0] = byteOut.toByteArray();
274 
275         this.dataLogger.write(XALogRecordType.USER.getType(), startSequence);
276     }
277 
278     private long recoverMessageSequenceId(byte[] bytes) {
279         byte[] headerData = bytes;
280         DataInputStream io = new DataInputStream(new ByteArrayInputStream(headerData));
281         try {
282             long messageSequenceId = io.readLong();
283             return messageSequenceId;
284         } catch (IOException e) {
285             throw new DelegatedRuntimeException(e);
286         }
287     }
288 
289 }