1 package org.csc.phynixx.loggersystem.logrecord;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 import org.csc.phynixx.common.exceptions.DelegatedRuntimeException;
25
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.Iterator;
29 import java.util.List;
30
31
32
33
34 public class PhynixxXADataRecorder implements IXADataRecorder {
35
36 private OrdinalGenerator ordinalGenerator = new OrdinalGenerator();
37
38 private long messageSequenceId = -1;
39
40 private List<IDataRecord> messages = new ArrayList<IDataRecord>();
41
42 private XADataLogger dataLogger;
43
44
45 private transient boolean committing = false;
46
47 private transient boolean completed = false;
48
49 private transient boolean prepared = false;
50
51 private IXADataRecorderLifecycleListener dataRecorderLifycycleListner;
52
53
54
55
56
57
58
59
60
61
62 static PhynixxXADataRecorder recoverDataRecorder(XADataLogger xaDataLogger, IXADataRecorderLifecycleListener dataRecorderLifycycleListner) {
63 try {
64 PhynixxXADataRecorder dataRecorder = new PhynixxXADataRecorder(-1, xaDataLogger, dataRecorderLifycycleListner);
65 dataRecorder.recover();
66 return dataRecorder;
67 } catch (Exception e) {
68 throw new DelegatedRuntimeException(e);
69 }
70
71 }
72
73
74
75
76
77
78 @Override
79 public void recover() {
80 try {
81 this.messages.clear();
82 this.dataLogger.prepareForRead(this);
83 this.dataLogger.recover(this);
84 } catch (Exception e) {
85 throw new DelegatedRuntimeException(e);
86 }
87 }
88
89
90
91
92
93
94
95
96
97
98 static PhynixxXADataRecorder openRecorderForWrite(long messageSequenceId, XADataLogger xaDataLogger, IXADataRecorderLifecycleListener dataRecorderLifycycleListner) {
99 PhynixxXADataRecorder dataRecorder = new PhynixxXADataRecorder(messageSequenceId, xaDataLogger, dataRecorderLifycycleListner);
100 try {
101 dataRecorder.dataLogger.prepareForWrite(dataRecorder);
102 return dataRecorder;
103 } catch (Exception e) {
104 throw new DelegatedRuntimeException(e);
105 }
106 }
107
108
109
110
111
112
113
114 private PhynixxXADataRecorder(long messageSequenceId, XADataLogger xaDataLogger, IXADataRecorderLifecycleListener dataRecorderLifycycleListner) {
115 this.messageSequenceId = messageSequenceId;
116 this.dataLogger = xaDataLogger;
117 this.dataRecorderLifycycleListner = dataRecorderLifycycleListner;
118 if (dataRecorderLifycycleListner != null) {
119 this.dataRecorderLifycycleListner.recorderDataRecorderOpened(this);
120 }
121 }
122
123
124
125
126 public List<IDataRecord> getDataRecords() {
127 return messages;
128 }
129
130
131 public boolean isCommitting() {
132 return committing;
133 }
134
135 public boolean isCompleted() {
136 return completed;
137 }
138
139 public boolean isPrepared() {
140 return prepared;
141 }
142
143 @Override
144 public boolean isEmpty() {
145 return this.messages.isEmpty();
146 }
147
148
149
150
151 public void writeRollbackData(byte[] data) {
152 this.writeRollbackData(toBytesBytes(data));
153 }
154
155
156
157
158
159 public void writeRollbackData(byte[][] data) {
160 this.createDataRecord(XALogRecordType.ROLLBACK_DATA, data);
161 }
162
163
164 public void writeRollforwardData(byte[] data) {
165 this.writeRollforwardData(toBytesBytes(data));
166 }
167
168 public void writeRollforwardData(byte[][] data) {
169 this.createDataRecord(XALogRecordType.ROLLFORWARD_DATA, data);
170 }
171
172 public void addMessage(IDataRecord message) {
173 this.establishState(message);
174 this.messages.add(message);
175 }
176
177 public void replayRecords(IDataRecordReplay replay) {
178
179 if (this.messages == null || this.messages.size() == 0) {
180 return;
181 }
182
183 for (int i = 0; i < messages.size(); i++) {
184 IDataRecord msg = this.messages.get(i);
185 if (msg.getLogRecordType().equals(XALogRecordType.ROLLBACK_DATA)) {
186 if (!this.isCompleted() && !this.isCommitting()) {
187 replay.replayRollback(msg);
188 }
189 } else if (msg.getLogRecordType().equals(XALogRecordType.ROLLFORWARD_DATA)) {
190 if (this.isCommitting()) {
191 replay.replayRollforward(msg);
192 }
193 }
194 }
195
196
197 replay.notifyNoMoreData();
198 }
199
200 @Override
201 public IDataRecord createDataRecord(XALogRecordType logRecordType, byte[] recordData) {
202 return this.createDataRecord(logRecordType, this.toBytesBytes(recordData));
203 }
204
205 @Override
206 public IDataRecord createDataRecord(XALogRecordType logRecordType, byte[][] recordData) {
207 PhynixxDataRecord msg = new PhynixxDataRecord(this.getXADataRecorderId(), this.ordinalGenerator.generate(), logRecordType, recordData);
208 try {
209 this.dataLogger.writeData(this, msg);
210 this.addMessage(msg);
211
212 return msg;
213 } catch (Exception e) {
214 throw new DelegatedRuntimeException(e);
215 }
216 }
217
218 private byte[][] toBytesBytes(byte[] recordData) {
219 byte[][] message = new byte[1][];
220 message[0] = recordData;
221 return message;
222 }
223
224 public long getMessageSequenceId() {
225 return messageSequenceId;
226 }
227
228 void setMessageSequenceId(long messageSequenceId) {
229 this.messageSequenceId = messageSequenceId;
230 }
231
232 private void establishState(IDataRecord msg) {
233 XALogRecordType logRecordType = msg.getLogRecordType();
234
235 if (this.isCommitting() && !logRecordType.equals(XALogRecordType.XA_DONE)) {
236 if (logRecordType == XALogRecordType.USER) {
237 throw new IllegalStateException("Sequence in State COMMITTING, only XA_DONE/ROLLFORWARD_DATA are accepted");
238 }
239 }
240
241 if (this.isCompleted()) {
242 throw new IllegalStateException("Sequence in State COMPLETED, no more data is accepted");
243 }
244
245 if (logRecordType.equals(XALogRecordType.XA_PREPARED)) {
246 this.committing = false;
247 this.completed = false;
248 this.prepared = true;
249 }
250
251 if (logRecordType.equals(XALogRecordType.ROLLFORWARD_DATA)) {
252 this.committing = true;
253 this.completed = false;
254 this.prepared = false;
255 }
256 if (logRecordType.equals(XALogRecordType.XA_DONE)) {
257 this.committing = false;
258 this.completed = true;
259 this.prepared = false;
260 }
261 }
262
263 public int compareTo(Object obj) {
264
265 if (this == obj)
266 return 1;
267 if (obj == null)
268 return 1;
269
270 if (getClass() != obj.getClass())
271 return 1;
272
273 final PhynixxXADataRecorder otherMsg = (PhynixxXADataRecorder) obj;
274
275 return Long.valueOf(this.getXADataRecorderId() - otherMsg.getXADataRecorderId()).intValue();
276 }
277
278 public boolean equals(Object obj) {
279 if (this == obj)
280 return true;
281 if (obj == null)
282 return false;
283 if (getClass() != obj.getClass())
284 return false;
285
286 final PhynixxXADataRecorder other = (PhynixxXADataRecorder) obj;
287 return messageSequenceId == other.messageSequenceId;
288 }
289
290
291
292
293 public long getXADataRecorderId() {
294 return this.messageSequenceId;
295 }
296
297 public int hashCode() {
298 final int prime = 31;
299 int result = 1;
300 result = prime * result + Long.valueOf(messageSequenceId).intValue();
301 return result;
302 }
303
304
305 public String toString() {
306 StringBuffer buffer = new StringBuffer(" { \n");
307 for (Iterator iterator = messages.iterator(); iterator.hasNext(); ) {
308 buffer.append('\t').append(iterator.next()).append('\n');
309 }
310 buffer.append(" }");
311 return buffer.toString();
312
313 }
314
315
316
317
318
319
320 void rewind() {
321 this.committing = false;
322 this.completed = false;
323 this.prepared = false;
324 this.messages.clear();
325 }
326
327
328
329
330
331 public void reset() {
332 this.committing = false;
333 this.completed = false;
334 this.prepared = false;
335 this.messages.clear();
336 try {
337 this.dataLogger.prepareForWrite(this);
338 } catch (Exception e) {
339 throw new DelegatedRuntimeException(e);
340 }
341
342 }
343
344
345
346
347 public void close() {
348 if (dataRecorderLifycycleListner != null) {
349 this.dataRecorderLifycycleListner.recorderDataRecorderClosed(this);
350 }
351 this.dataLogger.close();
352 this.rewind();
353 }
354
355 @Override
356 public boolean isClosed() {
357 return this.dataLogger.isClosed();
358 }
359
360
361
362
363 @Override
364 public void destroy() {
365 try {
366 this.dataLogger.destroy();
367 } catch (IOException e) {
368 throw new DelegatedRuntimeException(e);
369 }
370 }
371
372 public void messageSequenceCreated() {
373 this.dataRecorderLifycycleListner.recorderDataRecorderOpened(this);
374 }
375
376
377 }