View Javadoc

1   package org.csc.phynixx.tutorial;
2   
3   /*
4    * #%L
5    * phynixx-common
6    * %%
7    * Copyright (C) 2014 csc
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  
24  import org.csc.phynixx.common.exceptions.DelegatedRuntimeException;
25  import org.csc.phynixx.common.io.LogRecordReader;
26  import org.csc.phynixx.common.io.LogRecordWriter;
27  import org.csc.phynixx.connection.RequiresTransaction;
28  import org.csc.phynixx.loggersystem.logrecord.IDataRecord;
29  import org.csc.phynixx.loggersystem.logrecord.IDataRecordReplay;
30  import org.csc.phynixx.loggersystem.logrecord.IXADataRecorder;
31  
32  import java.io.File;
33  import java.io.IOException;
34  import java.io.RandomAccessFile;
35  import java.util.ArrayList;
36  import java.util.List;
37  
38  /**
39   * Basisklasse zur Verwaltung von Filezugriffen.
40   * <p/>
41   * A RandomAccessFile provides random access to the file's content.
42   */
43  public class TAEnabledUTFWriterImpl implements TAEnabledUTFWriter {
44  
45  
46      private transient String lockToken;
47      /**
48       * Das RandomAccessFile, dass zum Schreiben u. Lesen geoeffnet wird.
49       */
50      private UTFWriter utfWriter = null;
51  
52      private IXADataRecorder xaDataRecorder;
53  
54      private boolean autoCommit=false;
55  
56      private transient long rollbackPosition;
57  
58      private final String connectionId;
59  
60      public TAEnabledUTFWriterImpl(String connectionId, UTFWriter writer) throws Exception {
61  
62          this.connectionId = connectionId;
63          this.utfWriter= writer;
64          init();
65      }
66  
67      /**
68       * locks the sharedFile and inits the rollback  pointer
69       */
70      private void init() throws Exception{
71          // try to lock the writer
72          this.lockToken= this.getUTFWriter().lock();
73  
74          this.rollbackPosition= this.getUTFWriter().size();
75      }
76  
77      public String getConnectionId() {
78          return connectionId;
79      }
80  
81      public boolean isAutoCommit() {
82          return autoCommit;
83      }
84  
85      public void setAutoCommit(boolean autoCommit) {
86          this.autoCommit = autoCommit;
87      }
88  
89      /**
90       * Schliesst die Datei und den FileChannel
91       */
92      public void close() {
93          if(lockToken!=null) {
94              this.getUTFWriter().unlock(this.lockToken);
95              this.lockToken=null;
96          }
97      }
98  
99      /**
100      * zeigt an, ob die Instanz geschlossen ist
101      *
102      * @return true wenn die Datei geschlossen ist
103      */
104     public boolean isClosed() {
105         return (this.utfWriter == null);
106     }
107 
108 
109     @Override
110     @RequiresTransaction
111     public void resetContent() throws IOException {
112 
113         if (this.isClosed()) {
114             throw new IllegalStateException("Writer is closed");
115         }
116         this.getUTFWriter().resetContent();
117 
118         this.rollbackPosition = 0;
119 
120         if (this.getXADataRecorder() != null) {
121             LogRecordWriter logRecordWriter = new LogRecordWriter().writeLong(position());
122             this.getXADataRecorder().writeRollbackData(logRecordWriter.toByteArray());
123         }
124     }
125 
126     @Override
127     @RequiresTransaction
128     public TAEnabledUTFWriter write(String value) throws IOException {
129         if (value == null) {
130             throw new IllegalArgumentException("value must not be null");
131         }
132         this.getUTFWriter().write(value);
133 
134         return this;
135     }
136 
137     @Override
138     public List<String> readContent() throws IOException {
139 
140       return this.getUTFWriter().readContent();
141     }
142 
143 
144     UTFWriter getUTFWriter() {
145         if (this.isClosed()) {
146             throw new IllegalStateException("RandomAccessFile is close");
147         }
148 
149         return utfWriter;
150     }
151 
152 
153     /**
154      * bereitet die Writer zur Wiederverwendung vor
155      */
156     @Override
157     public void reset() {
158         try {
159             this.close();
160             this.init();
161         } catch (Exception e) {
162             throw new DelegatedRuntimeException(e);
163         }
164 
165     }
166 
167 
168     @Override
169     public void commit() {
170         try {
171             LogRecordWriter logRecordWriter = new LogRecordWriter().writeLong(position());
172             this.getXADataRecorder().writeRollforwardData(logRecordWriter.toByteArray());
173         } catch (Exception e) {
174             throw new DelegatedRuntimeException(e);
175         }
176     }
177 
178     private long position() throws IOException {
179         return this.utfWriter.position();
180     }
181 
182     @Override
183     public void rollback() {
184         try {
185             this.getUTFWriter().restoreSize(this.rollbackPosition);
186         } catch (IOException e) {
187             throw new DelegatedRuntimeException(e);
188         }
189     }
190 
191     /**
192      * definiert den gueltigen Zustand nach commit
193      */
194     @Override
195     public void prepare() {
196 
197     }
198 
199     @Override
200     public String toString() {
201         return "TAEnabledUTFWriterImpl{" +
202                 "connectionId='" + connectionId + '\'' +
203                 '}';
204     }
205 
206     /**
207      * @param xaDataRecorder
208      */
209     @Override
210     public void setXADataRecorder(IXADataRecorder xaDataRecorder) {
211         this.xaDataRecorder = xaDataRecorder;
212     }
213 
214     @Override
215     public IXADataRecorder getXADataRecorder() {
216         return xaDataRecorder;
217     }
218 
219 
220     private class DataRecordReplay implements IDataRecordReplay {
221         @Override
222         public void notifyNoMoreData() {
223 
224         }
225 
226 
227         @Override
228         public void replayRollback(IDataRecord record) {
229             LogRecordReader logRecordReader = new LogRecordReader(record.getData()[0]);
230             try {
231                 String fileName= logRecordReader.readUTF();
232                 long filePosition=logRecordReader.readLong();
233                 TAEnabledUTFWriterImpl.this.getUTFWriter().restoreSize(filePosition);
234                 TAEnabledUTFWriterImpl.this.rollbackPosition= filePosition;
235             } catch (IOException e) {
236                 throw new DelegatedRuntimeException(e);
237             }
238         }
239 
240         @Override
241         public void replayRollforward(IDataRecord record) {
242 
243         }
244     }
245 
246     @Override
247     public IDataRecordReplay recoverReplayListener() {
248         return new DataRecordReplay();
249     }
250 }