ソース¶
appengine_config.py¶
GAEで標準ライブラリ以外を使用するための設定です。
1 2 3 4 5 6 7 | #!/usr/bin/env python
# coding: utf-8
from google.appengine.ext import vendor
# libディレクトリにライブラリがあることを指定します
vendor.add('lib')
|
main.py¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | #!/usr/bin/env python
# coding: utf-8
import base64
import cloudstorage as gcs
import hashlib
import hmac
import json
import logging
import os
import webapp2
from google.appengine.api import app_identity
from google.appengine.api import urlfetch
import config
ENDPOINT = 'https://api.line.me/v2/bot/message/reply'
class MainHandler(webapp2.RequestHandler):
def get(self):
self.response.write('Hello Google App Engine')
def is_production():
"""Product環境かを確認します"""
return os.getenv('SERVER_SOFTWARE', '').startswith('Google App Engine/')
def is_valid_signature(signature, request_body):
"""署名を検証します"""
hash_digest = hmac.new(config.CHANNEL_SECRET.encode('utf-8'),
request_body, hashlib.sha256).digest()
return signature == base64.b64encode(hash_digest).decode()
def text_reply(reply_token, message):
"""textのメッセージを返信します"""
payload = {'replyToken': reply_token,
'messages': [{'type': 'text',
'text': message}]}
headers = {'Content-Type': 'application/json; charset=UTF-8',
'Authorization': 'Bearer {}'.format(config.CHANNEL_ACCESS_TOKEN)}
# GAEでは外部にリクエストする場合urlfetchを使います
r = urlfetch.fetch(ENDPOINT,
method=urlfetch.POST,
payload=json.dumps(payload, ensure_ascii=False),
headers=headers)
logging.debug(r.status_code)
logging.debug(r.content)
def get_image_from_line(message_id):
"""画像をLINEサーバへ取りに行きます"""
url = 'https://api.line.me/v2/bot/message/{}/content'.format(message_id)
headers = {'Authorization': 'Bearer {}'.format(config.CHANNEL_ACCESS_TOKEN)}
try:
r = urlfetch.fetch(url,
method=urlfetch.GET,
headers=headers)
return r.content
except Exception, e:
logging.error('Failed get_image')
raise e
def upload_to_gcs(event):
"""GCSにファイルを保存する"""
try:
# LINEサーバから画像を取得する
image = get_image_from_line(event['message']['id'])
# GCSのバケット名(今回はデフォルトのバッケットを使用する)
bucket_name = app_identity.get_default_gcs_bucket_name()
# ファイル名は重複しないようにLINEのメッセージIDにする
file_name = '/{}/{}'.format(bucket_name, event['message']['id'])
# ファイルをGCSに保存する
with gcs.open(file_name, 'w', content_type='image/jpeg') as f:
f.write(image)
message = u'保存しました。\nBucket:{}\nObject:{}'.format(bucket_name, file_name)
text_reply(event['replyToken'], message)
except Exception, e:
logging.error('Failed get_image')
raise e
class CallbackHandler(webapp2.RequestHandler):
"""LINEからのリクエストを受け取って処理をします"""
def post(self):
try:
# lineからのメッセージを取得
request_body = self.request.body
line_signature = self.request.headers.get('X-Line-Signature')
logging.debug(request_body)
logging.debug(line_signature)
# 署名の検証
if is_production() and not is_valid_signature(line_signature, request_body):
logging.error('Invalid signature.')
return
# jsonをdictに変換
events = json.loads(request_body)
for event in events['events']:
if event['message']['type'] == 'text':
# テキストのメッセージが送られてきた場合
text_reply(event['replyToken'], event['message']['text'])
elif event['message']['type'] == 'image':
# 画像が送られた場合
upload_to_gcs(event)
except Exception, e:
logging.error(e)
app = webapp2.WSGIApplication([
('/', MainHandler),
('/callback', CallbackHandler),
], debug=True)
|
注釈
アップロードした画像は 管理コンソール の[Storage]から保存した画像が確認できます