1 package org.csc.phynixx.loggersystem.logger.channellogger;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 import org.csc.phynixx.common.logger.IPhynixxLogger;
25 import org.csc.phynixx.common.logger.PhynixxLogManager;
26 import org.csc.phynixx.loggersystem.logger.IDataLogger;
27 import org.csc.phynixx.loggersystem.logrecord.ILogRecordReplayListener;
28 import org.csc.phynixx.loggersystem.logrecord.XALogRecordType;
29
30 import java.io.File;
31 import java.io.FileNotFoundException;
32 import java.io.IOException;
33 import java.io.RandomAccessFile;
34
35
36
37
38
39
40
41
42
43 public class FileChannelDataLogger implements IDataLogger {
44
45 private static final IPhynixxLogger LOG = PhynixxLogManager.getLogger(FileChannelDataLogger.class);
46
47
48
49
50 private static class FileAccessor {
51
52 private File cachedFile = null;
53 private String absolutePathName = null;
54
55 private FileAccessor(File cachedFile) {
56 this.cachedFile = cachedFile;
57 this.absolutePathName = this.cachedFile.getAbsolutePath();
58 }
59
60 private FileAccessor(String absolutePathName) {
61 this.absolutePathName = absolutePathName;
62 instanciateFile();
63 }
64
65 private void instanciateFile() {
66 if (cachedFile != null) {
67 return;
68 }
69 this.cachedFile = new File(this.absolutePathName);
70 if (!cachedFile.exists()) {
71 throw new IllegalStateException("File " + this.absolutePathName + " does not exist");
72 }
73 }
74
75 String getAbsolutePathName() {
76 return absolutePathName;
77 }
78
79 File getFile() {
80 instanciateFile();
81 return cachedFile;
82 }
83
84 void close() {
85 this.cachedFile = null;
86 }
87
88 @Override
89 public String toString() {
90 return "FileAccessor{" +
91 "cachedFile=" + cachedFile +
92 ", absolutePathName='" + absolutePathName + '\'' +
93 '}';
94 }
95 }
96
97
98
99
100 private static String FILE_MODE = "rw";
101
102 private static final IPhynixxLogger LOGGER = PhynixxLogManager.getLogger(FileChannelDataLogger.class);
103
104 private TAEnabledRandomAccessFile randomAccess = null;
105 private FileAccessor logFileAccess = null;
106
107 private AccessMode accessMode = AccessMode.NONE;
108
109
110
111
112
113
114
115
116
117
118 @Deprecated
119 private FileChannelDataLogger(File logFileAccess) throws IOException {
120 this.logFileAccess = new FileAccessor(logFileAccess);
121 associatedRandomAccessFile();
122 this.reopen(AccessMode.APPEND);
123 }
124
125
126
127
128
129
130
131
132
133
134 public FileChannelDataLogger(File logFileAccess,AccessMode aceessMode) throws IOException {
135 this.logFileAccess = new FileAccessor(logFileAccess);
136 associatedRandomAccessFile();
137 this.reopen(aceessMode);
138 }
139
140 public AccessMode getAccessMode() {
141 return this.accessMode;
142 }
143
144 private void maybeWritten() {
145 if (this.randomAccess == null) {
146 throw new IllegalStateException("Channel is not reopen.");
147 }
148 if (this.accessMode != AccessMode.APPEND && this.accessMode != AccessMode.WRITE) {
149 throw new IllegalStateException("Channel can not be written.");
150 }
151 }
152
153 private void maybeRead() {
154 if (this.randomAccess == null) {
155 throw new IllegalStateException("Channel is not reopen.");
156 }
157 if (this.accessMode == AccessMode.NONE) {
158 throw new IllegalStateException("Channel can not be read.");
159 }
160 }
161
162
163
164
165
166
167 @Override
168 public void open(AccessMode accessMode) throws IOException {
169
170 associatedRandomAccessFile();
171
172 this.reopen(accessMode);
173 }
174
175 private void associatedRandomAccessFile() throws IOException {
176 if(!this.isClosed()) {
177 this.close();
178 }
179 RandomAccessFile raf = openRandomAccessFile(this.logFileAccess.getFile(), FILE_MODE);
180 try {
181 this.randomAccess = new TAEnabledRandomAccessFile(raf);
182 LOG.error(Thread.currentThread() +" lock on "+this.logFileAccess +" succeeded");
183 } catch(IOException e) {
184 LOG.error(Thread.currentThread() +".lock on "+this.logFileAccess,e);
185 throw e;
186 } catch(IllegalStateException e) {
187 LOG.error(Thread.currentThread()+ ".lock on "+this.logFileAccess,e);
188 throw e;
189 }
190 }
191
192
193
194
195
196
197
198
199
200 public void reopen(AccessMode accessMode) throws IOException {
201
202 if( this.randomAccess==null) {
203 associatedRandomAccessFile();
204 }
205
206 this.accessMode = accessMode;
207
208
209 switch (accessMode) {
210 case READ:
211 maybeRead();
212
213 this.randomAccess.rewind();
214 break;
215 case WRITE:
216 maybeWritten();
217 this.randomAccess.reset();
218 break;
219 case APPEND:
220 maybeWritten();
221 this.randomAccess.forwardWind();
222 break;
223 default:
224 throw new IllegalArgumentException("Invalid AccessMode " + accessMode);
225 }
226
227
228 }
229
230 private RandomAccessFile openRandomAccessFile(File logFile, String fileMode) throws IOException {
231 try {
232 RandomAccessFile raf= new RandomAccessFile(logFile, fileMode);
233 if( LOG.isInfoEnabled()) {
234 LOG.info(Thread.currentThread()+" lock on "+logFile +" succeeded");
235 }
236 return raf;
237 } catch(IOException e) {
238 LOG.error(Thread.currentThread() +".release lock on "+logFile,e);
239 throw e;
240 } catch(IllegalStateException e) {
241 LOG.error(Thread.currentThread()+ " release lock on "+logFile,e);
242 throw e;
243 }
244 }
245
246
247 private void reset() throws IOException {
248 this.randomAccess.rewind();
249 this.randomAccess.commit();
250 }
251
252
253 public void write(short type, byte[] record) throws IOException {
254 maybeWritten();
255 this.randomAccess.writeInt(record.length);
256 this.randomAccess.write(record);
257 this.randomAccess.commit();
258 }
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277 public long write(short type, byte[][] records) throws IOException {
278 maybeWritten();
279 long referenceKey = this.randomAccess.position();
280
281 this.randomAccess.writeInt(records.length);
282
283
284 this.randomAccess.writeShort(type);
285
286 for (int i = 0; i < records.length; i++) {
287 if (records[i] == null) {
288 throw new IllegalArgumentException("Records[" + i + "]==null");
289 }
290 this.randomAccess.writeInt(records[i].length);
291 this.randomAccess.write(records[i]);
292 }
293 this.randomAccess.commit();
294 return referenceKey;
295 }
296
297
298 public void close() throws IOException {
299 if (this.randomAccess == null) {
300 return;
301 }
302 try {
303 this.randomAccess.close();
304 LOG.error(Thread.currentThread() +".release lock on "+this.logFileAccess+" succeeded");
305 } catch(IOException e) {
306 LOG.error(Thread.currentThread() +".release lock on "+this.logFileAccess,e);
307 throw e;
308 } catch(IllegalStateException e) {
309 LOG.error(Thread.currentThread()+ " release lock on "+this.logFileAccess,e);
310 throw e;
311
312 } finally {
313 this.logFileAccess.close();
314 this.randomAccess = null;
315 }
316 }
317
318
319
320
321
322
323
324
325 public void replay(ILogRecordReplayListener replay) throws IOException {
326 maybeRead();
327 this.randomAccess.rewind();
328 while (this.randomAccess.available() > (Integer.SIZE / Byte.SIZE) ) {
329
330 int length = this.randomAccess.readInt();
331 short type = this.randomAccess.readShort();
332 XALogRecordType recordType = XALogRecordType.resolve(type);
333 byte[][] data = new byte[length][];
334 for (int i = 0; i < length; i++) {
335 int recordSize = this.randomAccess.readInt();
336 data[i] = this.randomAccess.read(recordSize);
337 }
338 replay.onRecord(recordType, data);
339 }
340
341 }
342
343 public boolean isClosed() {
344 if (this.randomAccess == null) {
345 return true;
346 }
347 return randomAccess.isClose();
348 }
349
350
351 @Override
352 public void destroy() throws IOException {
353 try {
354 this.close();
355 } finally {
356 if (this.logFileAccess != null) {
357 try {
358 this.logFileAccess.getFile().delete();
359 } catch (Exception e) {
360
361 LOGGER.fatal(this, e);
362 }
363 }
364 }
365 }
366
367
368 public String toString() {
369 return "FileChannelDataLogger (" + this.logFileAccess.getAbsolutePathName() + ")";
370 }
371
372
373 }