View Javadoc

1   package org.csc.phynixx.loggersystem.logger.channellogger;
2   
3   /*
4    * #%L
5    * phynixx-common
6    * %%
7    * Copyright (C) 2014 csc
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  
24  import org.csc.phynixx.common.logger.IPhynixxLogger;
25  import org.csc.phynixx.common.logger.PhynixxLogManager;
26  import org.csc.phynixx.loggersystem.logger.IDataLogger;
27  import org.csc.phynixx.loggersystem.logrecord.ILogRecordReplayListener;
28  import org.csc.phynixx.loggersystem.logrecord.XALogRecordType;
29  
30  import java.io.File;
31  import java.io.FileNotFoundException;
32  import java.io.IOException;
33  import java.io.RandomAccessFile;
34  
35  
36  /**
37   * Logger uses a {@link org.csc.phynixx.loggersystem.logger.channellogger.TAEnabledRandomAccessFile}
38   * to persist the data.
39   * If the Logger is open, it holds a lock on
40   *
41   * This class is not thread safe . Use facades to protect instances
42   */
43  public class FileChannelDataLogger implements IDataLogger {
44  
45      private static final IPhynixxLogger LOG = PhynixxLogManager.getLogger(FileChannelDataLogger.class);
46  
47      /**
48       *
49       */
50      private static class FileAccessor {
51  
52          private File cachedFile = null;
53          private String absolutePathName = null;
54  
55          private FileAccessor(File cachedFile) {
56              this.cachedFile = cachedFile;
57              this.absolutePathName = this.cachedFile.getAbsolutePath();
58          }
59  
60          private FileAccessor(String absolutePathName) {
61              this.absolutePathName = absolutePathName;
62              instanciateFile();
63          }
64  
65          private void instanciateFile() {
66              if (cachedFile != null) {
67                  return;
68              }
69              this.cachedFile = new File(this.absolutePathName);
70              if (!cachedFile.exists()) {
71                  throw new IllegalStateException("File " + this.absolutePathName + " does not exist");
72              }
73          }
74  
75          String getAbsolutePathName() {
76              return absolutePathName;
77          }
78  
79          File getFile() {
80              instanciateFile();
81              return cachedFile;
82          }
83  
84          void close() {
85              this.cachedFile = null;
86          }
87  
88          @Override
89          public String toString() {
90              return "FileAccessor{" +
91                      "cachedFile=" + cachedFile +
92                      ", absolutePathName='" + absolutePathName + '\'' +
93                      '}';
94          }
95      }
96  
97      /**
98       * Open for reading and writing, as with "rw", and also require that every update to the file's content or metadata be written synchronously to the underlying storage device.
99       */
100     private static String FILE_MODE = "rw";
101 
102     private static final IPhynixxLogger LOGGER = PhynixxLogManager.getLogger(FileChannelDataLogger.class);
103 
104     private TAEnabledRandomAccessFile randomAccess = null;
105     private FileAccessor logFileAccess = null;
106 
107     private AccessMode accessMode = AccessMode.NONE;
108 
109     /**
110      * Opens a logger on base of die given logfile. A RandomAccessFile is created .
111      *
112      * The accessMode of the logger is {@link org.csc.phynixx.loggersystem.logger.channellogger.AccessMode#WRITE}, so you can start writting date
113      *
114      *
115      * @param logFileAccess
116      * @throws IOException
117      */
118     @Deprecated
119     private FileChannelDataLogger(File logFileAccess) throws IOException {
120         this.logFileAccess = new FileAccessor(logFileAccess);
121         associatedRandomAccessFile();
122         this.reopen(AccessMode.APPEND);
123     }
124 
125     /**
126      * Opens a logger on base of die given logfile. A RandomAccessFile is created .
127      *
128      * The accessMode of the logger is {@link org.csc.phynixx.loggersystem.logger.channellogger.AccessMode#WRITE}, so you can start writting date
129      *
130      *
131      * @param logFileAccess
132      * @throws IOException
133      */
134     public FileChannelDataLogger(File logFileAccess,AccessMode aceessMode) throws IOException {
135         this.logFileAccess = new FileAccessor(logFileAccess);
136         associatedRandomAccessFile();
137         this.reopen(aceessMode);
138     }
139 
140     public AccessMode getAccessMode() {
141         return this.accessMode;
142     }
143 
144     private void maybeWritten() {
145         if (this.randomAccess == null) {
146             throw new IllegalStateException("Channel is not reopen.");
147         }
148         if (this.accessMode != AccessMode.APPEND && this.accessMode != AccessMode.WRITE) {
149             throw new IllegalStateException("Channel can not be written.");
150         }
151     }
152 
153     private void maybeRead() {
154         if (this.randomAccess == null) {
155             throw new IllegalStateException("Channel is not reopen.");
156         }
157         if (this.accessMode == AccessMode.NONE) {
158             throw new IllegalStateException("Channel can not be read.");
159         }
160     }
161 
162     /**
163      * opens the logger with the specified ACCESS_MODE. If the logger isn't closed it is closed
164      * @param accessMode
165      * @throws IOException
166      */
167     @Override
168     public void open(AccessMode accessMode) throws IOException {
169 
170         associatedRandomAccessFile();
171 
172         this.reopen(accessMode);
173     }
174 
175     private void associatedRandomAccessFile() throws IOException {
176         if(!this.isClosed()) {
177            this.close();
178         }
179         RandomAccessFile raf = openRandomAccessFile(this.logFileAccess.getFile(), FILE_MODE);
180         try {
181             this.randomAccess = new TAEnabledRandomAccessFile(raf);
182             LOG.error(Thread.currentThread() +" lock on "+this.logFileAccess +" succeeded");
183         } catch(IOException e) {
184             LOG.error(Thread.currentThread() +".lock on "+this.logFileAccess,e);
185             throw e;
186         } catch(IllegalStateException e) {
187             LOG.error(Thread.currentThread()+ ".lock on "+this.logFileAccess,e);
188             throw e;
189         }
190     }
191 
192 
193     /**
194      * reopens the datalogger. It is assumed that the logger is open.
195      *
196      * @param accessMode
197      * @throws IOException
198      * @throws java.lang.IllegalStateException logger isn't open
199      */
200     public void reopen(AccessMode accessMode) throws IOException {
201 
202         if( this.randomAccess==null) {
203             associatedRandomAccessFile();
204         }
205 
206         this.accessMode = accessMode;
207 
208         // write start sequences ...
209         switch (accessMode) {
210             case READ:
211                 maybeRead();
212                 // start reading from the fiorts position
213                 this.randomAccess.rewind();
214                 break;
215             case WRITE:
216                 maybeWritten();
217                 this.randomAccess.reset();
218                 break;
219             case APPEND:
220                 maybeWritten();
221                 this.randomAccess.forwardWind();
222                 break;
223             default:
224                 throw new IllegalArgumentException("Invalid AccessMode " + accessMode);
225         }
226 
227 
228     }
229 
230     private RandomAccessFile openRandomAccessFile(File logFile, String fileMode) throws IOException {
231         try {
232             RandomAccessFile raf= new RandomAccessFile(logFile, fileMode);
233             if( LOG.isInfoEnabled()) {
234                 LOG.info(Thread.currentThread()+" lock on "+logFile +" succeeded");
235             }
236             return raf;
237         } catch(IOException e) {
238             LOG.error(Thread.currentThread() +".release lock on "+logFile,e);
239             throw e;
240         } catch(IllegalStateException e) {
241             LOG.error(Thread.currentThread()+ " release lock on "+logFile,e);
242             throw e;
243         }
244     }
245 
246 
247     private void reset() throws IOException {
248         this.randomAccess.rewind();
249         this.randomAccess.commit();
250     }
251 
252 
253     public void write(short type, byte[] record) throws IOException {
254         maybeWritten();
255         this.randomAccess.writeInt(record.length);
256         this.randomAccess.write(record);
257         this.randomAccess.commit();
258     }
259 
260     /**
261      * <pre>
262      *    +-- length of records[0]
263      *    +-- data of records[0]
264      *    +-- length of records[1]
265      *    +-- data of records[1]
266      *    . . .
267      *
268      * </pre>
269      * <p/>
270      * this format ensures that the record could be recovered
271      *
272      * @param type    a record type defined in LogRecordType.
273      * @param records
274      * @return the file position before writing the data
275      * @throws IOException
276      */
277     public long write(short type, byte[][] records) throws IOException {
278         maybeWritten();
279         long referenceKey = this.randomAccess.position();
280         // Number of records ....
281         this.randomAccess.writeInt(records.length);
282 
283         // write the type of the recods ...
284         this.randomAccess.writeShort(type);
285 
286         for (int i = 0; i < records.length; i++) {
287             if (records[i] == null) {
288                 throw new IllegalArgumentException("Records[" + i + "]==null");
289             }
290             this.randomAccess.writeInt(records[i].length);
291             this.randomAccess.write(records[i]);
292         }
293         this.randomAccess.commit();
294         return referenceKey;
295     }
296 
297 
298     public void close() throws IOException {
299         if (this.randomAccess == null) {
300             return;
301         }
302         try {
303             this.randomAccess.close();
304             LOG.error(Thread.currentThread() +".release lock on "+this.logFileAccess+" succeeded");
305         } catch(IOException e) {
306             LOG.error(Thread.currentThread() +".release lock on "+this.logFileAccess,e);
307             throw e;
308         } catch(IllegalStateException e) {
309             LOG.error(Thread.currentThread()+ " release lock on "+this.logFileAccess,e);
310             throw e;
311 
312         } finally {
313             this.logFileAccess.close();
314             this.randomAccess = null;
315         }
316     }
317 
318 
319     /**
320      * the records a recovered from the format described in {@link #write(short, byte[][])}
321      *
322      * @param replay replayListener
323      * @throws IOException
324      */
325     public void replay(ILogRecordReplayListener replay) throws IOException {
326         maybeRead();
327         this.randomAccess.rewind();
328         while (this.randomAccess.available() > (Integer.SIZE / Byte.SIZE) ) {
329 
330             int length = this.randomAccess.readInt();
331             short type = this.randomAccess.readShort();
332             XALogRecordType recordType = XALogRecordType.resolve(type);
333             byte[][] data = new byte[length][];
334             for (int i = 0; i < length; i++) {
335                 int recordSize = this.randomAccess.readInt();
336                 data[i] = this.randomAccess.read(recordSize);
337             }
338             replay.onRecord(recordType, data);
339         }
340 
341     }
342 
343     public boolean isClosed() {
344         if (this.randomAccess == null) {
345             return true;
346         }
347         return randomAccess.isClose();
348     }
349 
350 
351     @Override
352     public void destroy() throws IOException {
353         try {
354             this.close();
355         } finally {
356             if (this.logFileAccess != null) {
357                 try {
358                     this.logFileAccess.getFile().delete();
359                 } catch (Exception e) {
360                     // just log the error -- if cleanup fails, no failure of the system
361                     LOGGER.fatal(this, e);
362                 }
363             }
364         }
365     }
366 
367 
368     public String toString() {
369         return "FileChannelDataLogger (" + this.logFileAccess.getAbsolutePathName() + ")";
370     }
371 
372 
373 }