Shampoo is a asyncio websocket protocol implementation for Autobahn.
Shampoo will connect incomming websockets to user defined endpoint
classes based on their path
. The connecting client can make calls to the endpoint using simple JSON
messages and the endpoint can send the client push messages.
Note: Only python versions 3.5
and up are supported.
$ pip install shampoo
First setup the Autobahn websocket server:
#!/usr/bin/env python
import asyncio
from autobahn.asyncio.websocket import WebSocketServerFactory
from shampoo.shampoo import ShampooProtocol
import txaio
if __name__ == '__main__':
txaio.use_asyncio()
factory = WebSocketServerFactory('ws://localhost:9007')
factory.protocol = ShampooProtocol
loop = asyncio.get_event_loop()
coro = loop.create_server(factory, '', 9007)
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.close()
This will give us a running server. Now we set up an example endpoint. We're
creating a 'number' endpoint. We can connect to the endpoint for the numbers
0 - 9 and create an endpoint to add a number. The endpoint connection will be
at ws://localhost:9007/number/<number>/
.
from shampoo import shampoo
@shampoo.websocket_endpoint('r/number/(?P<number>\d+)/?')
class NumberEndpoint():
def __init__(self, number, protocol, peer, params, headers):
"""Connection initialization for the number endpoint"""
self.number = int(number)
// We only accept numbers 0 - 9.
if self.number not in range(10):
// Connection will be refused with http status code 404.
raise shampoo.ShampooEndpointInitializationError(
404, 'Number not available')
def cleanup(self):
"""Called when the connection is closed. We can clean up here"""
@shampoo.websocket_method()
def add_number(self, request_data):
"""Method that can be called by the client.
Needs to return a tuple with an `object`, `status_code` and a
`message`. You can ommit the status code and the message.
"""
return {
"number": self.number + request_data['number']
}, 200, 'ok' ## You can ommit the status code and message
We registered the endpoint using a decorator. Note that we also had to use a
decorator for the add_number method. Only allowed methods can be remotely
called, this is to prevent someone from calling the __init__
method or any
method you would not want to expose. You can manually set the method as
callable by by setting NumberEndpoint.add_number.is_endpoint_method = True
.
You can also manually register the endpoint itself using register_endpoint
.
from shampoo.shampoo import register_endpoint
register_endpoint(r'/number/(?P<number>\d+)/?', NumberEndpoint)
That's it, now we can connect to this server and make calls. For example make
a connection to ws://localhost:9007/number/3/
. The connection needs to
request the shampoo
protocol, otherwise the connection will be refused.
When the connection is established we can make a call by sending a JSON
message:
{
"method": "is_prime",
"request_data": {
"number": 4
},
"request_id": 1
}
This will give the following response:
{
"response_data": {
"number": 7
},
"message": "ok",
"status": 200,
"request_id": 1
}
For the exact specification of the request and response JSON
see
request.json and
response.json.
You can notifiy a connected client of any events with push messages. This is
an example using redis pubsub
.
import asyncio
import json
import asyncio_redis
from shampoo import shampoo
@shampoo.websocket_endpoint('r/notifications/?')
class NotificationEndpoint():
def __init__(protocol, **kwargs):
self.protocol = protocol
sef.protocol.register_coroutine(self.notifications)
@asyncio.coroutine
def notifications(self):
self.subscription = Subscription()
yield from self.subscription.initialize()
while True:
message = yield from self.subscription.next_message()
self._protocol.push_message({'message': message})
class Subscription():
"""Listens to Redis and relay messages to all subscribers
This class sets up one connection to Redis and distributes all
messages to all subscribers. Without this there would be a Redis
connection per subscriber (websocket connection).
"""
redis = None
subscriptions = []
@asyncio.coroutine
def initialize():
if self.redis is None:
self.redis = yield from asyncio_redis.Connection.create(
host='127.0.0.1', port=6379)
self.pubsub = yield from self.redis.start_subscribe()
yield from self.pubsub.subscribe(['notifications'])
asyncio.get_event_loop().create_task(
Subscription.activity_monitor())
@asyncio.coroutine
def activity_monitor()
while True:
result = yield from pubsub.next_published()
message = json.loads(result.value)
subscriptions = Subscription.subscriptions
Subscription.subscriptions = []
for subscription in subscriptions:
subscription.future.set_result(message)
@asyncio.coroutine
def next_message():
self.future = asyncio.Future()
self.subscriptions.append(self)
return future
When you publish a message to redis with
redis.publish('notifications', 'This is a notification!')
, the client
gets the following message:
{
"push_data": {
"message": "This is a notification!"
}
}
See for the exact specification push_message.json.
For JSON decoding and encoding the standard python json
module is used. If
you want to use a custom encoder or decoder you can set them like this:
from shampoo import shampoo
from custom_json import CustomJSONEncoder, CustomJSONDecoder
shampoo.set_json_encoder(CustomJSONEncoder)
shampoo.set_json_decoder(CustomerJSONDecoder)