Project

General

Profile

Improvement #2150 ยป StreamReader.java

Henning Blohm, 04.01.2023 12:37

 
1
/*
2
 * z2-Environment
3
 * 
4
 * Copyright(c) ZFabrik Software GmbH & Co. KG
5
 * 
6
 * contact@zfabrik.de
7
 * 
8
 * http://www.z2-environment.eu
9
 */
10
package com.zfabrik.impl.workers;
11

    
12
import java.io.IOException;
13
import java.io.Reader;
14
import java.util.logging.Level;
15
import java.util.logging.Logger;
16

    
17
/**
18
 * @author hb
19
 * 
20
 * Working with ordinary streams is not working properly, since sometimes you
21
 * cannot close it (blocks) or the reading thread cannot be interrupted.
22
 * <ol>
23
 * <li>http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4514257</li>
24
 * </ol>
25
 * 
26
 */
27
public class StreamReader implements Runnable {
28
	private final static String[] STATES = new String[] { "STOPPING", "INACTIVE", "RUNNING", "DECOUPLED" };
29
	private final static short STOPPING = 0;
30
	private final static short INACTIVE = 1;
31
	private final static short RUNNING = 2;
32
	private final static short DECOUPLED = 3;
33
	private final static int INITIAL_BUFFER_SIZE = 1024;
34
	private final static int MAX_BUFFER_SIZE = 10*1024*1024;
35
	private Reader in;
36
	private short state = INACTIVE;
37
	private Thread thread;
38
	private IStreamEventHandler handler;
39
	private Logger logger;
40
	private boolean processing = false;
41
	private Object lock = new Object();
42

    
43
	/**
44
	 * @param in
45
	 * @param logger
46
	 * @param pcr
47
	 */
48
	public StreamReader(Reader in, Logger logger, IStreamEventHandler handler) {
49
		super();
50
		this.in = in;
51
		this.handler = handler;
52
		this.logger = logger;
53
	}
54

    
55
	private short _state() {
56
		synchronized (this.lock) {
57
			return this.state;
58
		}
59
	}
60

    
61
	private short _state(short ns) {
62
		synchronized (this.lock) {
63
			short h = this.state;
64
			this.state = ns;
65
			return h;
66
		}
67
	}
68

    
69
	public void run() {
70
		long count = 0;
71
		try {
72
			this.thread = Thread.currentThread();
73
			_state(RUNNING);
74
			int c = -2;
75
			StringBuilder buffer = new StringBuilder(INITIAL_BUFFER_SIZE);
76
			while ((_state() >= RUNNING) && ((c = this.in.read()) >= 0)) {
77
				count++;
78
				if (c == 10 || buffer.length()>=MAX_BUFFER_SIZE) {
79
					try {
80
						synchronized (this.lock) {
81
							this.processing = true;
82
						}
83
						this.handler.process(buffer.toString());
84
					} catch (Exception e) {
85
						if (this.logger != null)
86
							this.logger.log(Level.WARNING, "Error in line processing of handler", e);
87
					} finally {
88
						synchronized (this.lock) {
89
							this.processing = false;
90
						}
91
					}
92
					buffer.setLength(0);
93
				} else
94
				if (c==13) {
95
					// ignore
96
				} else {
97
					buffer.append((char) c);
98
				}
99
			}
100
			if (_state()==RUNNING) {
101
				this.logger.warning("Unexpected end of worker stream!");
102
			}
103
			if (buffer.length() > 0) {
104
				try {
105
					this.handler.process(buffer.toString());
106
				} catch (Exception e) {
107
					if (this.logger != null)
108
						this.logger.log(Level.WARNING, "Error in line processing of handler", e);
109
				}
110
			}
111
			if (this.logger != null)
112
				this.logger.fine("Left stream reader loop in state " + STATES[this.state] + " last char read=" + c);
113
		} catch (Exception e) {
114
			if (_state() >= RUNNING) {
115
				// otherwise ignore
116
				if (this.logger != null)
117
					this.logger.log(Level.WARNING, "Problem reading stream", e);
118
			} else if (_state() == STOPPING) {
119
				if (this.logger != null)
120
					this.logger.finer("Caught exception in stopping state: process probably terminated orderly");
121
			}
122
		} finally {
123
			_state(INACTIVE);
124
			try {
125
				this.handler.close();
126
			} catch (Exception e) {
127
				if (this.logger != null)
128
					this.logger.log(Level.WARNING, "Caught exception during close() call of stream event handler", e);
129
			} finally {
130
				synchronized (this.lock) {
131
					this.lock.notifyAll();
132
				}
133
				if (this.logger != null)
134
					this.logger.fine("Stream reader exiting after having read " + count + " characters");
135
			}
136
		}
137
	}
138

    
139
	public final void close(boolean sync) {
140
		synchronized (this.lock) {
141
			if (this.state >= RUNNING) {
142
				if (this.logger != null)
143
					this.logger.fine("Stopping stream reader");
144
				this.state = STOPPING;
145
				try {
146
					// bad trick... but on <=1.4.2 this seems to be the way..
147
					this.in.close();
148
				} catch (IOException ioe) {
149
					if (this.logger != null)
150
						this.logger.log(Level.WARNING, "Exception during stop-close of in stream", ioe);
151
				}
152
				if (this.logger != null)
153
					this.logger.finer("Interrupting reader thread!");
154
				if (!this.processing) {
155
					// interrupting the reader thread is a way of kicking it out
156
					// of dizzyness
157
					this.thread.interrupt();
158
				}
159
			}
160
		}
161
		if (sync) {
162
			try {
163
				synchronized (this.lock) {
164
					while (this.state == STOPPING) {
165
						if (!this.processing) {
166
							if (this.logger != null)
167
								this.logger.fine("Interrupting reader thread!");
168
							this.thread.interrupt();
169
						}
170
						this.lock.wait(200);
171
					}
172
				}
173
			} catch (InterruptedException e) {
174
				if (this.logger != null)
175
					this.logger.log(Level.WARNING, "Stream reader stop interrupted!", e);
176
			}
177
		}
178
	}
179

    
180
	public void waitFor(long timeout) {
181
		synchronized (this.lock) {
182
			if (this.state != INACTIVE) {
183
				try {
184
					long start = System.currentTimeMillis();
185
					this.lock.wait(timeout);
186
					if (this.logger != null)
187
						this.logger.fine("Stream reader terminated after " + (System.currentTimeMillis() - start) + "ms wait time");
188
				} catch (InterruptedException e) {
189
					if (this.logger != null)
190
						this.logger.log(Level.WARNING, "Wait for termination interrupted", e);
191
				}
192
			}
193
		}
194
	}
195

    
196
	public void decouple() {
197
		synchronized (this.lock) {
198
			this.state = DECOUPLED;
199
		}
200
	}
201

    
202
}
    (1-1/1)