Alexa-news-stentiment-evalu.../f-ask/flask_ask/models.py

461 lines
14 KiB
Python
Raw Normal View History

2019-04-12 19:32:29 +00:00
import inspect
from flask import json
from xml.etree import ElementTree
import aniso8601
from .core import session, context, current_stream, stream_cache, dbgdump
from .cache import push_stream
import uuid
class _Field(dict):
"""Container to represent Alexa Request Data.
Initialized with request_json and creates a dict object with attributes
to be accessed via dot notation or as a dict key-value.
Parameters within the request_json that contain their data as a json object
are also represented as a _Field object.
Example:
payload_object = _Field(alexa_json_payload)
request_type_from_keys = payload_object['request']['type']
request_type_from_attrs = payload_object.request.type
assert request_type_from_keys == request_type_from_attrs
"""
def __init__(self, request_json={}):
super(_Field, self).__init__(request_json)
for key, value in request_json.items():
if isinstance(value, dict):
value = _Field(value)
self[key] = value
def __getattr__(self, attr):
# converts timestamp str to datetime.datetime object
if 'timestamp' in attr:
return aniso8601.parse_datetime(self.get(attr))
return self.get(attr)
def __setattr__(self, key, value):
self.__setitem__(key, value)
class _Response(object):
def __init__(self, speech):
self._json_default = None
self._response = {
'outputSpeech': _output_speech(speech)
}
def simple_card(self, title=None, content=None):
card = {
'type': 'Simple',
'title': title,
'content': content
}
self._response['card'] = card
return self
def standard_card(self, title=None, text=None, small_image_url=None, large_image_url=None):
card = {
'type': 'Standard',
'title': title,
'text': text
}
if any((small_image_url, large_image_url)):
card['image'] = {}
if small_image_url is not None:
card['image']['smallImageUrl'] = small_image_url
if large_image_url is not None:
card['image']['largeImageUrl'] = large_image_url
self._response['card'] = card
return self
def list_display_render(self, template=None, title=None, backButton='HIDDEN', token=None, background_image_url=None, image=None, listItems=None, hintText=None):
directive = [
{
'type': 'Display.RenderTemplate',
'template': {
'type': template,
'backButton': backButton,
'title': title,
'listItems': listItems
}
}
]
if background_image_url is not None:
directive[0]['template']['backgroundImage'] = {
'sources': [
{'url': background_image_url}
]
}
if hintText is not None:
hint = {
'type':'Hint',
'hint': {
'type':"PlainText",
'text': hintText
}
}
directive.append(hint)
self._response['directives'] = directive
return self
def display_render(self, template=None, title=None, backButton='HIDDEN', token=None, background_image_url=None, image=None, text=None, hintText=None):
directive = [
{
'type': 'Display.RenderTemplate',
'template': {
'type': template,
'backButton': backButton,
'title': title,
'textContent': text
}
}
]
if background_image_url is not None:
directive[0]['template']['backgroundImage'] = {
'sources': [
{'url': background_image_url}
]
}
if image is not None:
directive[0]['template']['image'] = {
'sources': [
{'url': image}
]
}
if token is not None:
directive[0]['template']['token'] = token
if hintText is not None:
hint = {
'type':'Hint',
'hint': {
'type':"PlainText",
'text': hintText
}
}
directive.append(hint)
self._response['directives'] = directive
return self
def link_account_card(self):
card = {'type': 'LinkAccount'}
self._response['card'] = card
return self
def consent_card(self, permissions):
card = {
'type': 'AskForPermissionsConsent',
'permissions': [permissions]
}
self._response['card'] = card
return self
def render_response(self):
response_wrapper = {
'version': '1.0',
'response': self._response,
'sessionAttributes': session.attributes
}
kw = {}
if hasattr(session, 'attributes_encoder'):
json_encoder = session.attributes_encoder
kwargname = 'cls' if inspect.isclass(json_encoder) else 'default'
kw[kwargname] = json_encoder
dbgdump(response_wrapper, **kw)
return json.dumps(response_wrapper, **kw)
class statement(_Response):
def __init__(self, speech):
super(statement, self).__init__(speech)
self._response['shouldEndSession'] = True
class question(_Response):
def __init__(self, speech):
super(question, self).__init__(speech)
self._response['shouldEndSession'] = False
def reprompt(self, reprompt):
reprompt = {'outputSpeech': _output_speech(reprompt)}
self._response['reprompt'] = reprompt
return self
class buy(_Response):
def __init__(self, productId=None):
self._response = {
'shouldEndSession': True,
'directives': [{
'type': 'Connections.SendRequest',
'name': 'Buy',
'payload': {
'InSkillProduct': {
'productId': productId
}
},
'token': 'correlationToken'
}]
}
class refund(_Response):
def __init__(self, productId=None):
self._response = {
'shouldEndSession': True,
'directives': [{
'type': 'Connections.SendRequest',
'name': 'Cancel',
'payload': {
'InSkillProduct': {
'productId': productId
}
},
'token': 'correlationToken'
}]
}
class upsell(_Response):
def __init__(self, productId=None, msg=None):
self._response = {
'shouldEndSession': True,
'directives': [{
'type': 'Connections.SendRequest',
'name': 'Upsell',
'payload': {
'InSkillProduct': {
'productId': productId
},
'upsellMessage': msg
},
'token': 'correlationToken'
}]
}
class delegate(_Response):
def __init__(self, updated_intent=None):
self._response = {
'shouldEndSession': False,
'directives': [{'type': 'Dialog.Delegate'}]
}
if updated_intent:
self._response['directives'][0]['updatedIntent'] = updated_intent
class elicit_slot(_Response):
"""
Sends an ElicitSlot directive.
slot - The slot name to elicit
speech - The output speech
updated_intent - Optional updated intent
"""
def __init__(self, slot, speech, updated_intent=None):
self._response = {
'shouldEndSession': False,
'directives': [{
'type': 'Dialog.ElicitSlot',
'slotToElicit': slot,
}],
'outputSpeech': _output_speech(speech),
}
if updated_intent:
self._response['directives'][0]['updatedIntent'] = updated_intent
class confirm_slot(_Response):
"""
Sends a ConfirmSlot directive.
slot - The slot name to confirm
speech - The output speech
updated_intent - Optional updated intent
"""
def __init__(self, slot, speech, updated_intent=None):
self._response = {
'shouldEndSession': False,
'directives': [{
'type': 'Dialog.ConfirmSlot',
'slotToConfirm': slot,
}],
'outputSpeech': _output_speech(speech),
}
if updated_intent:
self._response['directives'][0]['updatedIntent'] = updated_intent
class confirm_intent(_Response):
"""
Sends a ConfirmIntent directive.
"""
def __init__(self, speech, updated_intent=None):
self._response = {
'shouldEndSession': False,
'directives': [{
'type': 'Dialog.ConfirmIntent',
}],
'outputSpeech': _output_speech(speech),
}
if updated_intent:
self._response['directives'][0]['updatedIntent'] = updated_intent
class audio(_Response):
"""Returns a response object with an Amazon AudioPlayer Directive.
Responses for LaunchRequests and IntentRequests may include outputSpeech in addition to an audio directive
Note that responses to AudioPlayer requests do not allow outputSpeech.
These must only include AudioPlayer Directives.
@ask.intent('PlayFooAudioIntent')
def play_foo_audio():
speech = 'playing from foo'
stream_url = www.foo.com
return audio(speech).play(stream_url)
@ask.intent('AMAZON.PauseIntent')
def stop_audio():
return audio('Ok, stopping the audio').stop()
"""
def __init__(self, speech=''):
super(audio, self).__init__(speech)
if not speech:
self._response = {}
self._response['directives'] = []
def play(self, stream_url, offset=0, opaque_token=None):
"""Sends a Play Directive to begin playback and replace current and enqueued streams."""
self._response['shouldEndSession'] = True
directive = self._play_directive('REPLACE_ALL')
directive['audioItem'] = self._audio_item(stream_url=stream_url, offset=offset, opaque_token=opaque_token)
self._response['directives'].append(directive)
return self
def enqueue(self, stream_url, offset=0, opaque_token=None):
"""Adds stream to the queue. Does not impact the currently playing stream."""
directive = self._play_directive('ENQUEUE')
audio_item = self._audio_item(stream_url=stream_url,
offset=offset,
push_buffer=False,
opaque_token=opaque_token)
audio_item['stream']['expectedPreviousToken'] = current_stream.token
directive['audioItem'] = audio_item
self._response['directives'].append(directive)
return self
def play_next(self, stream_url=None, offset=0, opaque_token=None):
"""Replace all streams in the queue but does not impact the currently playing stream."""
directive = self._play_directive('REPLACE_ENQUEUED')
directive['audioItem'] = self._audio_item(stream_url=stream_url, offset=offset, opaque_token=opaque_token)
self._response['directives'].append(directive)
return self
def resume(self):
"""Sends Play Directive to resume playback at the paused offset"""
directive = self._play_directive('REPLACE_ALL')
directive['audioItem'] = self._audio_item()
self._response['directives'].append(directive)
return self
def _play_directive(self, behavior):
directive = {}
directive['type'] = 'AudioPlayer.Play'
directive['playBehavior'] = behavior
return directive
def _audio_item(self, stream_url=None, offset=0, push_buffer=True, opaque_token=None):
"""Builds an AudioPlayer Directive's audioItem and updates current_stream"""
audio_item = {'stream': {}}
stream = audio_item['stream']
# existing stream
if not stream_url:
# stream.update(current_stream.__dict__)
stream['url'] = current_stream.url
stream['token'] = current_stream.token
stream['offsetInMilliseconds'] = current_stream.offsetInMilliseconds
# new stream
else:
stream['url'] = stream_url
stream['token'] = opaque_token or str(uuid.uuid4())
stream['offsetInMilliseconds'] = offset
if push_buffer: # prevents enqueued streams from becoming current_stream
push_stream(stream_cache, context['System']['user']['userId'], stream)
return audio_item
def stop(self):
"""Sends AudioPlayer.Stop Directive to stop the current stream playback"""
self._response['directives'].append({'type': 'AudioPlayer.Stop'})
return self
def clear_queue(self, stop=False):
"""Clears queued streams and optionally stops current stream.
Keyword Arguments:
stop {bool} set True to stop current current stream and clear queued streams.
set False to clear queued streams and allow current stream to finish
default: {False}
"""
directive = {}
directive['type'] = 'AudioPlayer.ClearQueue'
if stop:
directive['clearBehavior'] = 'CLEAR_ALL'
else:
directive['clearBehavior'] = 'CLEAR_ENQUEUED'
self._response['directives'].append(directive)
return self
def _copyattr(src, dest, attr, convert=None):
if attr in src:
value = src[attr]
if convert is not None:
value = convert(value)
setattr(dest, attr, value)
def _output_speech(speech):
try:
xmldoc = ElementTree.fromstring(speech)
if xmldoc.tag == 'speak':
return {'type': 'SSML', 'ssml': speech}
except (UnicodeEncodeError, ElementTree.ParseError) as e:
pass
return {'type': 'PlainText', 'text': speech}