1 package org.csc.phynixx.loggersystem.logrecord;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 import org.apache.commons.io.IOUtils;
25 import org.csc.phynixx.common.exceptions.DelegatedRuntimeException;
26 import org.csc.phynixx.common.exceptions.ExceptionUtils;
27 import org.csc.phynixx.common.logger.IPhynixxLogger;
28 import org.csc.phynixx.common.logger.PhynixxLogManager;
29 import org.csc.phynixx.loggersystem.logger.IDataLogger;
30 import org.csc.phynixx.loggersystem.logger.channellogger.AccessMode;
31
32 import java.io.*;
33
34
35
36
37
38
39 public class XADataLogger {
40
41 public boolean isClosed() {
42 return this.dataLogger.isClosed();
43 }
44
45 public void destroy() throws IOException {
46 this.dataLogger.destroy();
47 }
48
49 private class RecoverReplayListener implements ILogRecordReplayListener {
50
51 private int count = 0;
52 private String loggerName;
53
54 private PhynixxXADataRecorder dataRecorder;
55
56 private RecoverReplayListener(PhynixxXADataRecorder dataRecorder) {
57 this.dataRecorder = dataRecorder;
58 }
59
60 public int getCountLogRecords() {
61 return count;
62 }
63
64
65 public void onRecord(XALogRecordType recordType, byte[][] fieldData) {
66 if (count == 0) {
67 dataRecorder.setMessageSequenceId(XADataLogger.this.recoverMessageSequenceId(fieldData[0]));
68 } else{
69 short typeId = recordType.getType();
70 switch (typeId) {
71 case XALogRecordType.XA_START_TYPE:
72 case XALogRecordType.XA_PREPARED_TYPE:
73 case XALogRecordType.ROLLFORWARD_DATA_TYPE:
74 case XALogRecordType.XA_DONE_TYPE:
75 case XALogRecordType.USER_TYPE:
76 case XALogRecordType.ROLLBACK_DATA_TYPE:
77 XADataLogger.this.recoverData(dataRecorder, recordType, fieldData);
78 break;
79 default:
80 LOGGER.error("Unknown LogRecordtype " + recordType);
81 break;
82 }
83 }
84
85 count++;
86 }
87
88 }
89
90
91 private static final IPhynixxLogger LOGGER = PhynixxLogManager.getLogger(XADataLogger.class);
92
93 private static final int HEADER_SIZE = 8 + 4;
94
95 private IDataLogger dataLogger;
96
97
98 XADataLogger(IDataLogger dataLogger) {
99 this.dataLogger = dataLogger;
100 }
101
102
103
104
105
106
107
108
109 void prepareForWrite(PhynixxXADataRecorder dataRecorder) throws IOException, InterruptedException {
110 this.dataLogger.reopen(AccessMode.WRITE);
111 this.writeStartSequence(dataRecorder);
112 }
113
114
115
116
117
118
119
120 void prepareForAppend(PhynixxXADataRecorder dataRecorder) throws IOException, InterruptedException {
121 this.dataLogger.reopen(AccessMode.APPEND);
122 }
123
124
125
126
127
128
129
130
131
132 void prepareForRead(PhynixxXADataRecorder dataRecorder) throws IOException, InterruptedException {
133 this.dataLogger.reopen(AccessMode.READ);
134 }
135
136
137
138
139
140
141
142
143
144 void writeData(PhynixxXADataRecorder dataRecorder, IDataRecord message) throws IOException {
145 DataOutputStream io = null;
146 try {
147
148 ByteArrayOutputStream byteIO = new ByteArrayOutputStream(HEADER_SIZE);
149 io = new DataOutputStream(byteIO);
150
151 io.writeLong(message.getXADataRecorderId());
152 io.writeInt(message.getOrdinal().intValue());
153 byte[] header = byteIO.toByteArray();
154
155 byte[][] data = message.getData();
156 byte[][] content = null;
157 if (data == null) {
158 content = new byte[][]{header};
159 } else {
160 content = new byte[data.length + 1][];
161 content[0] = header;
162 for (int i = 0; i < data.length; i++) {
163 content[i + 1] = data[i];
164 }
165 }
166
167 try {
168 this.dataLogger.write(message.getLogRecordType().getType(), content);
169 } catch (Exception e) {
170 throw new DelegatedRuntimeException("writing message " + message + "\n" + ExceptionUtils.getStackTrace(e), e);
171 }
172 } finally {
173 if (io != null) {
174 io.close();
175 }
176 }
177
178
179 }
180
181
182
183
184
185
186
187
188
189 void recover(PhynixxXADataRecorder dataRecorder) throws IOException, InterruptedException {
190 RecoverReplayListener listener = new RecoverReplayListener(dataRecorder);
191 dataRecorder.rewind();
192 this.dataLogger.replay(listener);
193 if (LOGGER.isDebugEnabled()) {
194 LOGGER.debug("# Records=" + listener.getCountLogRecords());
195 }
196 }
197
198
199
200
201
202
203
204
205
206 private void recoverData(PhynixxXADataRecorder dataRecorder, XALogRecordType logRecordType, byte[][] fieldData) {
207 if (LOGGER.isDebugEnabled()) {
208 if (fieldData == null || fieldData.length == 0) {
209 throw new IllegalArgumentException("Record fields are empty");
210 }
211 }
212
213 byte[] headerData = fieldData[0];
214 DataInputStream io = new DataInputStream(new ByteArrayInputStream(headerData));
215 try {
216 long messageSequenceId = io.readLong();
217 int ordinal = io.readInt();
218 byte[][] content = null;
219
220 if (fieldData.length > 1) {
221 content = new byte[fieldData.length - 1][];
222 for (int i = 0; i < fieldData.length - 1; i++) {
223 content[i] = fieldData[i + 1];
224 }
225 } else {
226 content = new byte[][]{};
227 }
228
229 PhynixxDataRecord msg = new PhynixxDataRecord(dataRecorder.getXADataRecorderId(), ordinal, logRecordType, content);
230 dataRecorder.addMessage(msg);
231
232
233 } catch (Exception e) {
234 throw new DelegatedRuntimeException(e);
235 } finally {
236 if (io != null) {
237 IOUtils.closeQuietly(io);
238 }
239 }
240 }
241
242 void close() {
243 try {
244 this.dataLogger.close();
245 } catch (Exception e) {
246 throw new DelegatedRuntimeException(e);
247 }
248
249 }
250
251
252
253
254
255
256
257
258
259
260 private void writeStartSequence(IXADataRecorder dataRecorder) throws IOException, InterruptedException {
261 ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
262 try {
263 DataOutputStream dos = new DataOutputStream(byteOut);
264 dos.writeLong(dataRecorder.getXADataRecorderId());
265 dos.flush();
266 } finally {
267 if (byteOut != null) {
268 IOUtils.closeQuietly(byteOut);
269 }
270 }
271
272 byte[][] startSequence = new byte[1][];
273 startSequence[0] = byteOut.toByteArray();
274
275 this.dataLogger.write(XALogRecordType.USER.getType(), startSequence);
276 }
277
278 private long recoverMessageSequenceId(byte[] bytes) {
279 byte[] headerData = bytes;
280 DataInputStream io = new DataInputStream(new ByteArrayInputStream(headerData));
281 try {
282 long messageSequenceId = io.readLong();
283 return messageSequenceId;
284 } catch (IOException e) {
285 throw new DelegatedRuntimeException(e);
286 }
287 }
288
289 }