import sys
import time
import argparse
import logging
log = logging.getLogger('eventsource.client')
from tornado.ioloop import IOLoop
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
[docs]class Event(object):
"""
Defines a received event
"""
def __init__(self):
self.name = None
self.data = None
self.id = None
def __repr__(self):
return "Event<%s,%s,%s>" % (str(self.id), str(self.name), str(self.data.replace('\n','\\n')))
[docs]class EventSourceClient(object):
def __init__(self,url,action,target,callback=None,retry=0):
"""
Build the event source client
:param url: string, the url to connect to
:param action: string of the listening action to connect to
:param target: string with the listening token
:param callback: function with one parameter (Event) that gets called for each received event
:param retry: timeout between two reconnections (0 means no reconnection)
"""
self._url = "http://%s/%s/%s" % (url,action,target)
AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
self.http_client = AsyncHTTPClient()
self.http_request = HTTPRequest(url=self._url,
method='GET',
headers={"content-type":"text/event-stream"},
request_timeout=0,
streaming_callback=self.handle_stream)
if callback is None:
self.cb = lambda e: log.info( "received %s" % (e,) )
else:
self.cb = callback
self.retry_timeout = int(retry)
[docs] def poll(self):
"""
Function to call to start listening
"""
if self.retry_timeout == 0:
self.http_client.fetch(self.http_request, self.handle_request)
IOLoop.instance().start()
while self.retry_timeout!=0:
self.http_client.fetch(self.http_request, self.handle_request)
IOLoop.instance().start()
time.sleep(self.retry_timeout/1000)
[docs] def end(self):
"""
Function to call to end listening
"""
self.retry_timeout=0
IOLoop.instance().stop()
[docs] def handle_stream(self,message):
"""
Acts on message reception
:param message: string of an incoming message
parse all the fields and builds an Event object that is passed to the callback function
"""
event = Event()
for line in message.strip('\r\n').split('\r\n'):
(field, value) = line.split(":",1)
if field == 'event':
event.name = value
elif field == 'data':
value = value.lstrip()
if event.data is None:
event.data = value
else:
event.data = "%s\n%s" % (event.data, value)
elif field == 'id':
event.id = value
elif field == 'retry':
try:
self.retry_timeout = int(value)
log.info( "timeout reset: %s" % (value,) )
except ValueError:
pass
elif field == '':
log.info( "received comment: %s" % (value,) )
else:
raise Exception("Unknown field !")
if event.name is not None:
self.cb(event)
[docs] def handle_request(self,response):
"""
Function that gets called on non long-polling actions,
on error or on end of polling.
"""
if response.error:
log.error(response.error)
else:
log.info("disconnection requested")
self.retry_timeout=0
IOLoop.instance().stop()
[docs]def start():
parser = argparse.ArgumentParser(prog=sys.argv[0],
description="Event Source Client")
parser.add_argument("-H",
"--host",
dest="host",
default='127.0.0.1',
help='Host to connect to')
# PORT ARGUMENT
parser.add_argument("-P",
"--port",
dest="port",
default='8888',
help='Port to be used connection')
parser.add_argument("-d",
"--debug",
dest="debug",
action="store_true",
help='enables debug output')
parser.add_argument("-r",
"--retry",
dest="retry",
default='-1',
help='Reconnection timeout')
parser.add_argument(dest="token",
help='Token to be used for connection')
args = parser.parse_args(sys.argv[1:])
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
###
def log_events(event):
log.info( "received %s" % (event,) )
EventSourceClient(url="%(host)s:%(port)s" % args.__dict__,
action="poll",
target=args.token,
retry=args.retry).poll()
###
if __name__ == "__main__":
start()