ソース

worker.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python
# coding: utf-8

import cloudstorage as gcs
import json
import logging

import webapp2
from google.appengine.api import app_identity
from google.appengine.api import urlfetch
from google.cloud import vision
from oauth2client.service_account import ServiceAccountCredentials

import config

ENDPOINT = 'https://api.line.me/v2/bot/message/reply'

SCOPE = ['https://www.googleapis.com/auth/cloud-platform']


def text_reply(reply_token, message):
    """textのメッセージを返信します"""
    payload = {'replyToken': reply_token,
               'messages': [{'type': 'text',
                             'text': message}]}

    headers = {'Content-Type': 'application/json; charset=UTF-8',
               'Authorization': 'Bearer {}'.format(config.CHANNEL_ACCESS_TOKEN)}

    # GAEでは外部にリクエストする場合urlfetchを使います
    r = urlfetch.fetch(ENDPOINT,
                       method=urlfetch.POST,
                       payload=json.dumps(payload, ensure_ascii=False),
                       headers=headers)

    logging.debug(r.status_code)
    logging.debug(r.content)


def get_image_from_line(message_id):
    """画像をLINEサーバへ取りに行きます"""

    url = 'https://api.line.me/v2/bot/message/{}/content'.format(message_id)

    headers = {'Authorization': 'Bearer {}'.format(config.CHANNEL_ACCESS_TOKEN)}

    try:
        r = urlfetch.fetch(url,
                           method=urlfetch.GET,
                           headers=headers)
        return r.content
    except Exception, e:
        logging.error('Failed get_image')
        raise e


def upload_to_gcs(event):
    """GCSにファイルを保存する"""
    try:
        # LINEサーバから画像を取得する
        image = get_image_from_line(event['message']['id'])

        # GCSのバケット名(今回はデフォルトのバッケットを使用する)
        bucket_name = app_identity.get_default_gcs_bucket_name()
        # ファイル名は重複しないようにLINEのメッセージIDにする
        file_name = '/{}/{}'.format(bucket_name, event['message']['id'])

        # ファイルをGCSに保存する
        with gcs.open(file_name, 'w', content_type='image/jpeg') as f:
            f.write(image)

        return bucket_name, event['message']['id']

    except Exception, e:
        logging.error('Failed get_image')
        raise e


def request_vision_api(bucket_name, file_name):
    """Vision APIにリクエストを送信します"""
    try:
        credentials = ServiceAccountCredentials.from_json_keyfile_name(config.KEY_FILE,
                                                                       scopes=SCOPE)
        source_uri = 'gs://{}/{}'.format(bucket_name, file_name)
        logging.debug('source_uri: {}'.format(source_uri))

        client = vision.Client(project=app_identity.get_application_id(),
                               credentials=credentials)
        image = client.image(source_uri=source_uri)
        return image.detect_labels(limit=5)

    except Exception, e:
        logging.error('failed request vision api.')
        raise e


class MessagingHandler(webapp2.RequestHandler):
    def post(self):
        try:
            events = json.loads(self.request.get('events'))

            for event in events['events']:
                if event['message']['type'] == 'text':
                    # テキストのメッセージが送られてきた場合
                    text_reply(event['replyToken'], event['message']['text'])

                elif event['message']['type'] == 'image':
                    # 画像が送られた場合

                    bucket_name, file_name = upload_to_gcs(event)
                    labels = request_vision_api(bucket_name, file_name)

                    result = ''
                    for label in labels:
                        result += '{}: {}\n'.format(label.description, label.score)

                    message = u'この写真には、\n\n{}のようなものが写っているよ'.format(result)
                    text_reply(event['replyToken'], message)

        except Exception, e:
            logging.error(e)

app = webapp2.WSGIApplication([
    ('/task/messaging', MessagingHandler),
], debug=True)

app.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
project: your-project-id # 各自のプロジェクトIDを指定
version: 1
runtime: python27
api_version: 1
threadsafe: yes

# ルーティング
handlers:
# task queue
- url: /task/messaging
  script: worker.app
  login: admin

# 上記以外はmain.pyに向けます
- url: .*
  script: main.app # main.pyではない

# 使用ライブラリの指定
libraries:
# Webフレームワークのwebapp2を使用する
- name: webapp2
  version: "2.5.2"

queue.yaml

1
2
3
4
5
6
7
queue:
- name: default
  rate: 1/s # 1秒に1回処理する
  retry_parameters:
    # リトライの最大回数の設定
    # タスクはエラーになると自動でリトライしてくれます
    task_retry_limit: 3

main.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!/usr/bin/env python
# coding: utf-8

import base64
import hashlib
import hmac
import logging
import os

import webapp2
from google.appengine.api import taskqueue

import config


class MainHandler(webapp2.RequestHandler):
    def get(self):
        self.response.write('Hello Google App Engine')


def is_production():
    """Product環境かを確認します"""
    return os.getenv('SERVER_SOFTWARE', '').startswith('Google App Engine/')


def is_valid_signature(signature, request_body):
    """署名を検証します"""
    hash_digest = hmac.new(config.CHANNEL_SECRET.encode('utf-8'),
                           request_body, hashlib.sha256).digest()
    return signature == base64.b64encode(hash_digest).decode()


class CallbackHandler(webapp2.RequestHandler):
    """LINEからのリクエストを受け取って処理する"""
    def post(self):
        try:
            # lineからのメッセージを取得
            request_body = self.request.body
            line_signature = self.request.headers.get('X-Line-Signature')

            logging.debug(request_body)
            logging.debug(line_signature)

            # 署名の検証
            if is_production() and not is_valid_signature(line_signature, request_body):
                logging.error('Invalid signature.')
                return

            # QueueにPushする
            taskqueue.add(url='/task/messaging',
                          params={'events': request_body})
        except Exception, e:
            logging.error(e)

app = webapp2.WSGIApplication([
    ('/', MainHandler),
    ('/callback', CallbackHandler),
], debug=True)