1 package org.csc.phynixx.connection.loggersystem;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 import org.csc.phynixx.common.exceptions.DelegatedRuntimeException;
25 import org.csc.phynixx.common.io.LogRecordWriter;
26 import org.csc.phynixx.connection.*;
27 import org.csc.phynixx.loggersystem.logger.IDataLoggerFactory;
28 import org.csc.phynixx.loggersystem.logrecord.*;
29
30 import java.io.IOException;
31 import java.util.ArrayList;
32 import java.util.Iterator;
33 import java.util.List;
34 import java.util.Set;
35
36
37
38
39
40 public class LoggerPerTransactionStrategy<C extends IPhynixxConnection & IXADataRecorderAware> extends PhynixxManagedConnectionListenerAdapter<C> implements IPhynixxLoggerSystemStrategy<C>, IPhynixxManagedConnectionListener<C> {
41
42
43 private IXARecorderRepository xaRecorderResource;
44
45
46
47
48 public void addLoggerListener(IXARecorderResourceListener listener) {
49
50
51 }
52
53
54
55
56
57
58
59
60 public LoggerPerTransactionStrategy(IDataLoggerFactory loggerFactory) {
61 this.xaRecorderResource = new PhynixxXARecorderRepository(loggerFactory);
62 }
63
64
65 @Override
66 public void close() {
67 this.xaRecorderResource.close();
68 }
69
70
71 @Override
72 public void connectionRecovering(IManagedConnectionEvent<C> event) {
73 this.connectionRequiresTransaction(event);
74 }
75
76
77
78
79
80
81
82
83 @Override
84 public void connectionReleased(IManagedConnectionEvent<C> event) {
85
86
87 if( !event.getManagedConnection().hasCoreConnection()) {
88 return;
89 }
90
91 C con = event.getManagedConnection().getCoreConnection();
92 if (con == null || !(con instanceof IXADataRecorderAware)) {
93 return;
94 }
95
96 IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
97
98 IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
99 if (xaDataRecorder == null) {
100 return;
101 }
102
103
104 xaDataRecorder.close();
105
106 }
107
108
109
110
111
112
113
114
115 @Override
116 public void connectionFreed(IManagedConnectionEvent<C> event) {
117
118 if( !event.getManagedConnection().hasCoreConnection()) {
119 return;
120 }
121 C con = event.getManagedConnection().getCoreConnection();
122 if (con == null || !(con instanceof IXADataRecorderAware)) {
123 return;
124 }
125
126 IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
127
128 IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
129 if (xaDataRecorder == null) {
130 return;
131 }
132
133
134
135 if( event.getManagedConnection().hasTransactionalData()) {
136 xaRecorderResource.close();
137 } else {
138 xaDataRecorder.destroy();
139 }
140 messageAwareConnection.setXADataRecorder(null);
141
142 }
143
144
145
146
147
148
149 @Override
150 public void connectionRecovered(IManagedConnectionEvent<C> event) {
151
152 if( !event.getManagedConnection().hasCoreConnection()) {
153 return;
154 }
155 IPhynixxConnection con = event.getManagedConnection().getCoreConnection();
156 if (con == null || !(con instanceof IXADataRecorderAware)) {
157 return;
158 }
159
160 IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
161
162
163
164 IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
165 if (xaDataRecorder == null) {
166 return;
167 }
168
169
170
171 else {
172 xaDataRecorder.destroy();
173 messageAwareConnection.setXADataRecorder(null);
174 }
175
176 }
177
178 @Override
179 public void connectionRolledback(IManagedConnectionEvent<C> event) {
180 IPhynixxConnection con = event.getManagedConnection().getCoreConnection();
181 if (con == null || !(con instanceof IXADataRecorderAware)) {
182 return;
183 }
184
185 IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
186
187
188 IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
189 if (xaDataRecorder == null) {
190 return;
191 }
192
193
194 xaDataRecorder.reset();
195 messageAwareConnection.setXADataRecorder(null);
196
197 event.getManagedConnection().removeConnectionListener(this);
198 }
199
200
201 @Override
202 public void connectionCommitted(IManagedConnectionCommitEvent<C> event) {
203 IPhynixxConnection con = event.getManagedConnection().getCoreConnection();
204 if (con == null || !(con instanceof IXADataRecorderAware)) {
205 return;
206 }
207
208 IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
209
210
211
212 IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
213 if (xaDataRecorder == null) {
214 return;
215 }
216 xaDataRecorder.reset();
217 messageAwareConnection.setXADataRecorder(null);
218
219 }
220
221
222
223
224
225
226
227 private void writeStartSequence(IXADataRecorder dataRecorder) throws IOException, InterruptedException {
228
229 LogRecordWriter writer= new LogRecordWriter();
230 writer.writeLong(dataRecorder.getXADataRecorderId());
231 dataRecorder.writeRollbackData(writer.toByteArray());
232 }
233
234 @Override
235 public void connectionRequiresTransaction(IManagedConnectionEvent<C> event) {
236 IPhynixxConnection con = event.getManagedConnection().getCoreConnection();
237 if (con == null || !(con instanceof IXADataRecorderAware)) {
238 return;
239 }
240
241 IXADataRecorderAware messageAwareConnection = (IXADataRecorderAware) con;
242
243
244
245 IXADataRecorder xaDataRecorder = messageAwareConnection.getXADataRecorder();
246
247
248
249 if (xaDataRecorder != null && xaDataRecorder.isClosed()) {
250 xaDataRecorder = null;
251 xaDataRecorder.close();
252 }
253
254
255
256 if (xaDataRecorder == null) {
257 try {
258 IXADataRecorder xaLogger = this.xaRecorderResource.createXADataRecorder();
259 messageAwareConnection.setXADataRecorder(xaLogger);
260 } catch (Exception e) {
261
262 try {
263 Thread.currentThread().sleep(1000);
264 } catch (InterruptedException e1) {
265 }
266 try {
267 IXADataRecorder xaLogger = this.xaRecorderResource.createXADataRecorder();
268 messageAwareConnection.setXADataRecorder(xaLogger);
269 } catch (Exception ee) {
270 throw new DelegatedRuntimeException("creating new Logger for " + con, ee);
271 }
272 }
273 }
274 event.getManagedConnection().addConnectionListener(this);
275
276 }
277
278
279
280
281
282
283
284
285 @Override
286 public List<IXADataRecorder> readIncompleteTransactions() {
287 List<IXADataRecorder> messageSequences = new ArrayList<IXADataRecorder>();
288
289 try {
290
291 this.xaRecorderResource.recover();
292 Set<IXADataRecorder> xaDataRecorders = this.xaRecorderResource.getXADataRecorders();
293
294 for (Iterator<IXADataRecorder> iterator = xaDataRecorders.iterator(); iterator.hasNext(); ) {
295 IXADataRecorder dataRecorder = iterator.next();
296
297 if (!dataRecorder.isEmpty() ) {
298 messageSequences.add(dataRecorder);
299 } else {
300 dataRecorder.destroy();
301 }
302
303 }
304 return messageSequences;
305 } catch (Exception e) {
306 throw new DelegatedRuntimeException(e);
307 }
308 }
309
310
311 @Override
312 public IPhynixxManagedConnection<C> decorate(IPhynixxManagedConnection<C> managedConnection) {
313 managedConnection.addConnectionListener(this);
314 return managedConnection;
315 }
316
317 }