1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import datetime
23 import cgi
24
25 from twisted.internet import defer, protocol, reactor
26 from twisted.python.util import InsensitiveDict
27 from twisted.web import http
28
29 from flumotion.common import log
30 from flumotion.common import errors
31 from flumotion.component.misc.httpserver.httpcached import common
32 from flumotion.component.misc.httpserver.httpcached import http_utils
33
34
35 LOG_CATEGORY = "stream-provider"
36
37 USER_AGENT = "FlumotionClient/0.1"
38
39
41 if ts:
42 return datetime.datetime.fromtimestamp(ts).isoformat()
43 return "???"
44
45
47 """
48 Provides information about a stream in a standard way.
49 The information is retrieved by parsing HTTP headers.
50 """
51
53 self.expires = None
54 self.mtime = None
55 self.length = 0
56 self.start = 0
57 self.size = 0
58 self.mimeType = None
59
60 headers = InsensitiveDict(headers)
61
62 encoding = headers.get("Transfer-Encoding", None)
63 if encoding == 'chunked':
64 raise errors.FlumotionError("Chunked transfer not supported")
65
66 expires = headers.get("Expires", None)
67 if expires is not None:
68 try:
69 self.expires = http.stringToDatetime(expires)
70 except:
71 self.expires = 0
72
73 lastmod = headers.get("Last-Modified", None)
74 if lastmod is not None:
75 self.mtime = http.stringToDatetime(lastmod)
76
77 range = headers.get("Content-Range", None)
78 length = headers.get("Content-Length", None)
79 if range is not None:
80 start, end, total = http.parseContentRange(range)
81 self.start = start
82 self.length = total
83 if length is not None:
84 self.size = int(length)
85 else:
86 self.size = end - start
87 elif length is not None:
88 self.length = int(length)
89 self.size = int(length)
90 else:
91 raise errors.FlumotionError("Can't get length/size from headers",
92 headers)
93
94 ctype = headers.get("Content-Type", None)
95 if ctype is not None:
96 self.mimeType, _pdict = cgi.parse_header(ctype)
97
98
100 """
101 Allows retrieval of data streams using HTTP 1.0.
102 """
103
104 logCategory = LOG_CATEGORY
105
106 - def __init__(self, connTimeout=0, idleTimeout=0):
107 self.connTimeout = connTimeout
108 self.idleTimeout = idleTimeout
109
110 - def retrieve(self, consumer, url, proxyAddress=None, proxyPort=None,
111 ifModifiedSince=None, ifUnmodifiedSince=None,
112 start=None, size=None):
113 self.log("Requesting %s%s%s%s%s%s",
114 size and (" %d bytes" % size) or "",
115 start and (" starting at %d" % start) or "",
116 (size or start) and " from " or "",
117 url.toString(),
118 ifModifiedSince and (" if modified since %s"
119 % ts2str(ifModifiedSince)) or "",
120 ifUnmodifiedSince and (" if not modified since %s"
121 % ts2str(ifUnmodifiedSince)) or "")
122
123 getter = StreamGetter(consumer, url,
124 ifModifiedSince, ifUnmodifiedSince,
125 start, size, self.idleTimeout)
126 getter.connect(proxyAddress, proxyPort, self.connTimeout)
127 return getter
128
129
130 -class StreamGetter(protocol.ClientFactory, http.HTTPClient, log.Loggable):
131 """
132 Retrieves a stream using HTTP 1.0.
133
134 This class is at the same time a Factory and a Protocol,
135 this can be done because it's a client and in twisted
136 client factories only create on protocol.
137
138 The outcome, the stream info and stream data is forwarded
139 to a common.StreamConsumer instance given at creating time.
140
141 It supports range requests and some conditional request types
142 (ifModified and ifUnmodified).
143 """
144
145 logCategory = LOG_CATEGORY
146
147 HTTP_METHOD = 'GET'
148
149 host = None
150 port = None
151
152 - def __init__(self, consumer, url,
153 ifModifiedSince=None, ifUnmodifiedSince=None,
154 start=None, size=None, timeout=0):
155 self.consumer = consumer
156 self.url = url
157
158 self.ifModifiedSince = ifModifiedSince
159 self.ifUnmodifiedSince = ifUnmodifiedSince
160
161 self.start = start
162 self.size = size
163 self.timeout = timeout
164
165 self.headers = {}
166 self.peer = None
167 self.status = None
168 self.info = None
169
170 self._connected = False
171 self._canceled = False
172 self._remaining = None
173 self._idlecheck = None
174
175 self.logName = common.log_id(self)
176
179
180
181
182 - def connect(self, proxyAddress=None, proxyPort=None, timeout=0):
195
197 if not self.paused and self.transport is not None:
198 self.pauseProducing()
199 self.log("Request paused for %s", self.url)
200
202 if self.paused and self.transport is not None:
203 self.resumeProducing()
204 self.log("Request resumed for %s", self.url)
205
207 if self._connected and self.transport is not None:
208 self.transport.loseConnection()
209 self._cancelIdleCheck()
210 self.log("Request canceled for %s", self.url)
211 self._canceled = True
212
213
214
216 assert self.peer is None, "Protocol already built"
217 self.peer = addr
218 return self
219
222
224 self.log("Connection made for %s", self.url)
225 self.sendCommand(self.HTTP_METHOD, self.url.location)
226 self.sendHeader('Host', self.url.host)
227 self.sendHeader('User-Agent', USER_AGENT)
228 self.sendHeader('Connection', "close")
229
230 if self.ifModifiedSince:
231 datestr = http.datetimeToString(self.ifModifiedSince)
232 self.sendHeader('If-Modified-Since', datestr)
233
234 if self.ifUnmodifiedSince:
235 datestr = http.datetimeToString(self.ifUnmodifiedSince)
236 self.sendHeader('If-Unmodified-Since', datestr)
237
238 if self.start or self.size:
239 start = self.start or 0
240 end = (self.size and (start + self.size - 1)) or None
241 rangeSpecs = "bytes=%s-%s" % (start, end or "")
242 self.sendHeader('Range', rangeSpecs)
243
244 self.endHeaders()
245
246 self._resetIdleCheck()
247
254
256 self._keepActive()
257 status = int(status_str)
258 self.status = status
259
260 if status in (http.OK, http.NO_CONTENT, http.PARTIAL_CONTENT):
261 return
262
263 if status == http.REQUESTED_RANGE_NOT_SATISFIABLE:
264 self._serverError(common.RANGE_NOT_SATISFIABLE,
265 "HTTP range not satisfiable")
266 if status == http.NOT_MODIFIED:
267 self._conditionFail(common.STREAM_NOT_MODIFIED,
268 "Stream not modified")
269 elif status == http.PRECONDITION_FAILED:
270 self._conditionFail(common.STREAM_MODIFIED, "Stream Modified")
271 elif status == http.NOT_FOUND:
272 self._streamNotAvailable(common.STREAM_NOTFOUND,
273 "Resource Not Found")
274 elif status == http.FORBIDDEN:
275 self._streamNotAvailable(common.STREAM_FORBIDDEN,
276 "Resource Forbidden")
277 if status in (http.MOVED_PERMANENTLY, http.FOUND):
278 self._serverError(common.NOT_IMPLEMENTED,
279 "HTTP redirection not supported")
280 else:
281 self._serverError(common.NOT_IMPLEMENTED,
282 "Unsupported HTTP response: %s (%s)"
283 % (message, status))
284
288
298
307
309 if self.info is not None:
310 if self._remaining == 0:
311 self.log("Request done, got %d bytes starting at %d from %s, "
312 "last modified on %s", self.info.size,
313 self.info.start, self.url.toString(),
314 ts2str(self.info.mtime))
315 self._streamDone()
316 return
317 if self.info:
318 self.log("Incomplete request, missing %d bytes from the expected "
319 "%d bytes starting at %d from %s", self._remaining,
320 self.info.size, self.info.start, self.url.toString())
321 else:
322 self.log("Incomplete request %s", self.url.toString())
323
327
328
329
331 self._updateCount += 1
332
336
338 if self._idlecheck:
339 self._idlecheck.cancel()
340 self._idlecheck = None
341 self._updateCount = 0
342
349
353
360
365
370
375
379
383
388
389
390 if __name__ == "__main__":
391 import sys
392
394 k, v = a.split('=', 1)
395 if v == 'None':
396 d[k] = None
397 try:
398 d[k] = int(v)
399 except:
400 d[k] = v
401
402
403 kwargs = {}
404 for a in sys.argv[1:]:
405 addarg(kwargs, a)
406
407 url = kwargs.pop('url')
408
410
414
418
422
424 print "Finished"
425 reactor.stop()
426
427 - def onInfo(self, getter, info):
433
434 - def onData(self, getter, data):
437
438
439 consumer = DummyConsumer()
440 requester = StreamRequester(5000, 5000)
441 requester.retrieve(consumer, http_utils.Url.fromString(url), **kwargs)
442 reactor.run()
443