1 package org.csc.phynixx.connection;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 import org.csc.phynixx.common.cast.ImplementorUtils;
25 import org.csc.phynixx.common.logger.IPhynixxLogger;
26 import org.csc.phynixx.common.logger.PhynixxLogManager;
27 import org.csc.phynixx.loggersystem.logrecord.IDataRecordReplay;
28 import org.csc.phynixx.loggersystem.logrecord.IXADataRecorder;
29
30 import java.util.ArrayList;
31 import java.util.List;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 abstract class PhynixxManagedConnectionGuard<C extends IPhynixxConnection> implements IPhynixxManagedConnection<C>, IXADataRecorderAware, ICloseable , IAutoCommitAware{
69
70 private static final IPhynixxLogger LOG = PhynixxLogManager.getLogger(PhynixxManagedConnectionGuard.class);
71
72 private C connection = null;
73
74 private Class<C> connectionInterface;
75
76
77 private volatile boolean transactionalData = false;
78
79 private volatile boolean synchronizedConnection=true;
80
81 private volatile boolean closed = false;
82
83 private CloseStrategy closeStrategy;
84
85 private List<IPhynixxManagedConnectionListener<C>> listeners = new ArrayList<IPhynixxManagedConnectionListener<C>>();
86
87
88 final Long id;
89
90 private volatile Long boundThreadId =null;
91
92 protected PhynixxManagedConnectionGuard(long id, Class<C> connectionInterface, C connection, CloseStrategy<C> closeStrategy) {
93 this.id = id;
94 this.connectionInterface = connectionInterface;
95 this.setConnection(connection);
96 this.closeStrategy=closeStrategy;
97 this.bindToCurrentThread();
98 }
99
100 private void bindToCurrentThread() {
101 this.boundThreadId= Thread.currentThread().getId();
102 }
103
104 private void releaseThreadBinding() {
105 this.boundThreadId=null;
106 }
107
108 private void checkThreadBinding() {
109 long currentThreadBinding = Thread.currentThread().getId();
110 if( this.boundThreadId!=null && currentThreadBinding!=this.boundThreadId) {
111 throw new IllegalStateException("Connection is bound to Thread "+this.boundThreadId+" but called by Thread "+ currentThreadBinding);
112 }
113 }
114
115 @Override
116 public boolean equals(Object o) {
117 if (this == o) return true;
118 if (o == null || !(o instanceof IPhynixxManagedConnection)) return false;
119
120 IPhynixxManagedConnection that = (IPhynixxManagedConnection) o;
121 return that.getManagedConnectionId() == this.getManagedConnectionId();
122 }
123
124 @Override
125 public int hashCode() {
126 return id.hashCode();
127 }
128
129 @Override
130 public void setSynchronized(boolean state) {
131 this.synchronizedConnection=state;
132 }
133
134 @Override
135 public boolean isSynchronized() {
136 return this.synchronizedConnection;
137 }
138
139 @Override
140 public long getManagedConnectionId() {
141 return id;
142 }
143
144 public String toString() {
145 if (hasCoreConnection()) {
146 return this.getCoreConnection().toString();
147 }
148 return "no core connection";
149 }
150
151 @Override
152 public boolean hasTransactionalData() {
153 return transactionalData;
154 }
155
156 public boolean isClosed() {
157 return closed;
158 }
159
160
161 void setClosed(boolean closed) {
162 this.closed = closed;
163 this.transactionalData=false;
164 }
165
166 @Override
167 public void reopen() {
168
169 if(connection==null) {
170 throw new IllegalStateException("Connection is already set free");
171 }
172 if(this.hasTransactionalData()) {
173 LOG.warn("Connection " + this + " has tranactional data and has to be closed safely");
174 }
175 this.setClosed(false);
176 this.reset();
177 this.bindToCurrentThread();
178 }
179
180
181
182
183
184
185
186
187
188
189
190 abstract protected IPhynixxManagedConnection<C> getObservableProxy();
191
192 @Override
193 public C toConnection() {
194 return ImplementorUtils.cast(this.getObservableProxy(), this.connectionInterface);
195 }
196
197 private void setConnection(C con) {
198 if ((this.connection == null && con == null) ||
199 (this.connection != null && this.connection.equals(con)) ) {
200 return;
201 }
202 this.connection = con;
203 }
204
205
206 public IXADataRecorder getXADataRecorder() {
207 if (hasCoreConnection() && ImplementorUtils.isImplementationOf(getCoreConnection(), IXADataRecorderAware.class)) {
208 this.checkThreadBinding();
209 return ImplementorUtils.cast(getCoreConnection(), IXADataRecorderAware.class).getXADataRecorder();
210 }
211 return null;
212
213 }
214
215 @Override
216 public IDataRecordReplay recoverReplayListener() {
217 if (hasCoreConnection() && ImplementorUtils.isImplementationOf(getCoreConnection(), IXADataRecorderAware.class)) {
218 return ImplementorUtils.cast(getCoreConnection(), IXADataRecorderAware.class).recoverReplayListener();
219 }
220 return null;
221 }
222
223 public void setXADataRecorder(IXADataRecorder dataRecorder) {
224 if (hasCoreConnection() && ImplementorUtils.isImplementationOf(getCoreConnection(), IXADataRecorderAware.class)) {
225 ImplementorUtils.cast(getCoreConnection(), IXADataRecorderAware.class).setXADataRecorder(dataRecorder);
226 }
227 }
228
229 @Override
230 public C getCoreConnection() {
231 if( this.connection==null) {
232 throw new IllegalStateException("Connection is already set free and is invalid");
233 }
234 return this.connection;
235 }
236
237
238
239
240 @Override
241 public void close() {
242 if (!this.isClosed() && hasCoreConnection()) {
243 this.closeStrategy.close(this);
244 }
245 }
246
247
248
249
250
251
252
253
254 @Override
255 public void free() {
256 try {
257 if(hasCoreConnection()) {
258 this.getCoreConnection().close();
259 }
260 this.fireConnectionFreed();
261 this.boundThreadId= Thread.currentThread().getId();
262
263 } finally {
264 this.setClosed(true);
265
266 setTransactionalData(false);
267
268
269
270 this.releaseThreadBinding();
271 }
272
273 }
274
275 @Override
276 public boolean hasCoreConnection() {
277 return this.connection!=null;
278 }
279
280
281
282
283
284 public void release() {
285 try {
286 if(hasCoreConnection()) {
287 checkThreadBinding();
288 this.setClosed(true);
289 this.getCoreConnection().reset();
290 this.fireConnectionReleased();
291
292 }
293 } finally {
294 this.setClosed(true);
295
296
297 setTransactionalData(false);
298
299 this.releaseThreadBinding();
300 }
301 }
302
303
304 public boolean isAutoCommit() {
305 if (hasCoreConnection()) {
306 checkThreadBinding();
307 if(ImplementorUtils.isImplementationOf(this.getCoreConnection(), IAutoCommitAware.class)) {
308 return ImplementorUtils.cast(this.getCoreConnection(),IAutoCommitAware.class).isAutoCommit();
309 }
310 }
311 return false;
312
313 }
314
315 public void setAutoCommit(boolean autoCommit) {
316 if (hasCoreConnection()) {
317 checkThreadBinding();
318 if(ImplementorUtils.isImplementationOf(this.getCoreConnection(), IAutoCommitAware.class)) {
319 ImplementorUtils.cast(this.getCoreConnection(),IAutoCommitAware.class).setAutoCommit(autoCommit);
320 }
321 }
322 }
323
324
325
326
327
328
329 public void reset() {
330 if (hasCoreConnection()) {
331
332 checkThreadBinding();
333 this.getCoreConnection().reset();
334 setTransactionalData(false);
335
336 this.fireConnectionReset();
337 }
338 }
339
340 public void commit(boolean onePhaseCommit) {
341
342 if(!hasTransactionalData()) {
343 return;
344 }
345 fireConnectionCommitting(onePhaseCommit);
346 if (hasCoreConnection()) {
347 checkThreadBinding();
348 if( onePhaseCommit) {
349 this.getCoreConnection().prepare();
350 }
351 this.getCoreConnection().commit();
352 setTransactionalData(false);
353 }
354 this.fireConnectionCommitted(onePhaseCommit);
355
356 }
357
358 public void commit() {
359 this.commit(true);
360
361 }
362
363
364 public void prepare() {
365
366 if(!hasTransactionalData()) {
367 return;
368 }
369
370 this.fireConnectionPreparing();
371 if (hasCoreConnection()) {
372 checkThreadBinding();
373 this.getCoreConnection().prepare();
374 }
375 this.fireConnectionPrepared();
376 }
377
378
379 public void rollback() {
380
381 if(!hasTransactionalData()) {
382 return;
383 }
384
385 fireConnectionRollingBack();
386 if (hasCoreConnection()) {
387 checkThreadBinding();
388 this.getCoreConnection().rollback();
389 setTransactionalData(false);
390
391 }
392 this.fireConnectionRolledback();
393 }
394
395 private void setTransactionalData(boolean state) {
396 this.transactionalData=state;
397 }
398
399
400
401
402
403 @Override
404 public void recover() {
405
406
407 if (this.getCoreConnection() == null || !ImplementorUtils.isImplementationOf(getCoreConnection(), IXADataRecorderAware.class)) {
408 return;
409 }
410
411 IXADataRecorderAware con = ImplementorUtils.cast(getCoreConnection(), IXADataRecorderAware.class);
412
413
414 IXADataRecorder msgLogger = this.getXADataRecorder();
415 if (msgLogger.isEmpty()) {
416 return;
417 }
418 this.fireConnectionRecovering();
419 IDataRecordReplay dataRecordReplay = con.recoverReplayListener();
420
421 if( dataRecordReplay==null) {
422 throw new IllegalStateException("IPhynixxConnection.recoverReplayListener() has to provide a IDataRecordReplay to be recovered");
423 }
424
425
426 msgLogger.replayRecords(dataRecordReplay);
427
428 this.fireConnectionRecovered();
429
430 }
431
432
433 interface IEventDeliver<X extends IPhynixxConnection,Y extends IManagedConnectionEvent<X>> {
434 void fireEvent(IPhynixxManagedConnectionListener<X> listener, Y event);
435 }
436
437
438 public void addConnectionListener(IPhynixxManagedConnectionListener<C> listener) {
439 if (!listeners.contains(listener)) {
440 this.listeners.add(listener);
441 }
442 }
443
444 public synchronized void removeConnectionListener(IPhynixxManagedConnectionListener<C> listener) {
445
446 this.listeners.remove(listener);
447 }
448
449
450
451
452
453
454
455 private void fireEvent(IEventDeliver<C, IManagedConnectionEvent<C>> deliver) {
456 fireEventWithException(deliver, null);
457 }
458
459
460
461
462
463
464
465
466 private void fireEventWithException(IEventDeliver<C, IManagedConnectionEvent<C>> deliver, Exception exception) {
467 ManagedPhynixxConnectionEvent<C> event = new ManagedPhynixxConnectionEvent<C>(getObservableProxy(), exception);
468 deliverEvent(deliver, event);
469 }
470
471
472
473
474
475
476
477
478 private <E extends IManagedConnectionEvent<C>> void deliverEvent(IEventDeliver<C, E> deliver, E event) {
479
480 List<IPhynixxManagedConnectionListener<C>> tmp = new ArrayList(this.listeners);
481 for (int i = 0; i < tmp.size(); i++) {
482 IPhynixxManagedConnectionListener<C> listener = tmp.get(i);
483 if (LOG.isDebugEnabled()) {
484 LOG.debug("ConnectionPhynixxGuard " + event + " called listener " + listener + " on " + deliver);
485 }
486 deliver.fireEvent(listener, event);
487 }
488 }
489
490
491 protected void fireConnectionReleased() {
492 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
493 public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionEvent<C> event) {
494 listener.connectionReleased(event);
495 }
496
497 public String toString() {
498 return "connectionReleased";
499 }
500 };
501 fireEvent(deliver);
502 }
503
504 protected void fireConnectionErrorOccurred(final Exception exception) {
505 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
506 public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionEvent<C> event) {
507 listener.connectionErrorOccurred(event);
508 }
509
510 public String toString() {
511 return "connectionErrorOccurred";
512 }
513 };
514 fireEventWithException(deliver, exception);
515 }
516
517 protected void fireConnectionRequiresTransactionFinished() {
518 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
519 public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionEvent<C> event) {
520 listener.connectionRequiresTransaction(event);
521 }
522
523 public String toString() {
524 return "connectionRequiresTransaction";
525 }
526 };
527 fireEvent(deliver);
528 }
529
530
531 protected void fireConnectionRequiresTransaction() {
532
533 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
534 public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
535 listener.connectionRequiresTransaction(event);
536 }
537
538 public String toString() {
539 return "connectionRequiresTransaction";
540 }
541 };
542 fireEvent(deliver);
543
544 this.setTransactionalData(true);
545 }
546
547 protected void fireConnectionRequiresTransactionExecuted(Exception exception) {
548 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
549 public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
550 listener.connectionRequiresTransactionExecuted(event);
551 }
552
553 public String toString() {
554 return "connectionRequiresTransactionExecuted";
555 }
556 };
557 fireEventWithException(deliver, exception);
558 }
559
560 protected void fireConnectionRequiresTransactionExecuted() {
561 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
562 public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
563 listener.connectionRequiresTransactionExecuted(event);
564 }
565
566 public String toString() {
567 return "connectionRequiresTransactionExecuted";
568 }
569 };
570 fireEvent(deliver);
571 }
572
573
574 protected void fireConnectionRolledback() {
575 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
576 public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
577 listener.connectionRolledback(event);
578 }
579
580 public String toString() {
581 return "connectionRolledback";
582 }
583 };
584 fireEvent(deliver);
585 }
586
587
588
589 protected void fireConnectionPreparing() {
590 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
591 public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
592 listener.connectionPreparing(event);
593 }
594
595 public String toString() {
596 return "connectionPreparing";
597 }
598 };
599 fireEvent(deliver);
600 }
601
602 protected void fireConnectionPrepared() {
603 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
604 public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
605 listener.connectionPrepared(event);
606 }
607
608 public String toString() {
609 return "connectionPrepared";
610 }
611 };
612 fireEvent(deliver);
613 }
614
615 protected void fireConnectionCommitting(boolean onePhase) {
616 ManagedPhynixxConnectionCommitEvent<C> event = new ManagedPhynixxConnectionCommitEvent<C>(getObservableProxy(), onePhase);
617 IEventDeliver<C, IManagedConnectionCommitEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionCommitEvent<C>>() {
618 public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionCommitEvent<C> event) {
619 listener.connectionCommitting(event);
620 }
621
622 public String toString() {
623 return "connectionCommitting";
624 }
625 };
626 deliverEvent(deliver, event);
627 }
628
629 protected void fireConnectionCommitted(boolean onePhase) {
630 ManagedPhynixxConnectionCommitEvent<C> event = new ManagedPhynixxConnectionCommitEvent<C>(getObservableProxy(), onePhase);
631 IEventDeliver<C, IManagedConnectionCommitEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionCommitEvent<C>>() {
632 public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionCommitEvent<C> event) {
633 listener.connectionCommitted(event);
634 }
635
636 public String toString() {
637 return "connectionCommitted";
638 }
639 };
640 deliverEvent(deliver, event);
641 }
642
643
644 protected void fireConnectionFreed() {
645 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
646 public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
647 listener.connectionFreed(event);
648 }
649
650 public String toString() {
651 return "connectionFreed";
652 }
653 };
654 fireEvent(deliver);
655 }
656
657
658
659 protected void fireConnectionRecovering() {
660 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
661 public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
662 listener.connectionRecovering(event);
663 }
664
665 public String toString() {
666 return "connectionRecovering";
667 }
668 };
669 fireEvent(deliver);
670 }
671
672 protected void fireConnectionRollingBack() {
673 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
674 public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
675 listener.connectionRollingBack(event);
676 }
677
678 public String toString() {
679 return "connectionRollingBack";
680 }
681 };
682 fireEvent(deliver);
683 }
684
685 protected void fireConnectionRecovered() {
686 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C, IManagedConnectionEvent<C>>() {
687 public void fireEvent(IPhynixxManagedConnectionListener listener, IManagedConnectionEvent event) {
688 listener.connectionRecovered(event);
689 }
690
691 public String toString() {
692 return "connectionRecovered";
693 }
694 };
695 fireEvent(deliver);
696 }
697 private void fireConnectionReset() {
698
699 IEventDeliver<C, IManagedConnectionEvent<C>> deliver = new IEventDeliver<C,IManagedConnectionEvent<C>>() {
700 public void fireEvent(IPhynixxManagedConnectionListener<C> listener, IManagedConnectionEvent<C> event) {
701 listener.connectionReset(event);
702 }
703
704 public String toString() {
705 return "connectionRest";
706 }
707 };
708 fireEvent(deliver);
709 }
710
711 }