「AWS無料相談会」をオンラインで開催中

AppSyncのパイプラインリソルバーを利用して履歴データを管理する

  • 2019.03.17
  • AWS

AWS AppSyncのパイプラインリソルバーを利用して、
履歴データ管理を構築してみました。

パイプラインリソルバーとは?

パイプラインリゾルバーは、データソースに対してオペレーションを
順番に実行する機能を提供します。
複数のデータソースに対して順番に実行することも可能です。

パイプラインリソルバーを利用した履歴データ管理の構築

今回は、従業員が商品の在庫数を管理するシチュエーションを想定して
在庫数の変動を確認できる履歴データのテーブルを
パイプラインリソルバーを利用して構築していきます。

1. データソースの作成

データソースにはDynamoDBを利用します。下記のテーブルを作成します。

1-1. 在庫マスタテーブル

在庫マスタは文字列のハッシュキーを持つシンプルなテーブルを作成しました。

1-2. 履歴テーブル

履歴は作成日をレンジキーとして保持し、時系列データとします。

TTL属性を指定し、履歴データは自動的に削除されるようにしました。

2. AppSync APIの作成

在庫マスタテーブルをインポートしてAppSync APIを作成します。

自動的にスキーマが作成されますので、
商品名(ProductName)や在庫数(NumberOfStock)の属性を追加します。
下記の通りです。

input CreatePostInput {
    PK: String!
    ProductName: String
    NumberOfStock: Int
}

input DeletePostInput {
    PK: String!
}

type Mutation {
    createPost(input: CreatePostInput!): Post
    updatePost(input: UpdatePostInput!): Post
    deletePost(input: DeletePostInput!): Post
}

type Post {
    PK: String!
}

type PostConnection {
    items: [Post]
    nextToken: String
}

type Query {
    getPost(PK: String!): Post
    listPosts(filter: TablePostFilterInput, limit: Int, nextToken: String): PostConnection
}

type Subscription {
    onCreatePost(PK: String): Post
        @aws_subscribe(mutations: ["createPost"])
    onUpdatePost(PK: String): Post
        @aws_subscribe(mutations: ["updatePost"])
    onDeletePost(PK: String): Post
        @aws_subscribe(mutations: ["deletePost"])
}

input TableBooleanFilterInput {
    ne: Boolean
    eq: Boolean
}

input TableFloatFilterInput {
    ne: Float
    eq: Float
    le: Float
    lt: Float
    ge: Float
    gt: Float
    contains: Float
    notContains: Float
    between: [Float]
}

input TableIDFilterInput {
    ne: ID
    eq: ID
    le: ID
    lt: ID
    ge: ID
    gt: ID
    contains: ID
    notContains: ID
    between: [ID]
    beginsWith: ID
}

input TableIntFilterInput {
    ne: Int
    eq: Int
    le: Int
    lt: Int
    ge: Int
    gt: Int
    contains: Int
    notContains: Int
    between: [Int]
}

input TablePostFilterInput {
    PK: TableStringFilterInput
}

input TableStringFilterInput {
    ne: String
    eq: String
    le: String
    lt: String
    ge: String
    gt: String
    contains: String
    notContains: String
    between: [String]
    beginsWith: String
}

input UpdatePostInput {
    PK: String!
}

3. パイプラインリソルバーへの変換 

商品在庫を登録する時のリソルバーを、パイプラインリソルバーに変換します。
該当するリソルバーの Convert to pipeline resolver リンクを
クリックすることで簡単にパイプラインリソルバーに変換できます。

4. ファンクションの作成

パイプラインリソルバーに割り当てる複数のファンクションを作成します。

4-1. 在庫マスタ登録ファンクションの作成

createMain というファンクション名で作成しました。
主要な処理としましては、

  1. タイムスタンプを createdDate という項目で変数として定義する。
  2. createdDate と全てのKeyを含めた baseobject を作成し、 attributeValues に設定する。

などが挙げられます。

リクエストマッピングテンプレートは下記の通りです。

#set($baseobject = {'createdDate': $util.time.nowEpochMilliSeconds()})
#foreach($key in $ctx.args.input.keySet())
    $util.qr($baseobject.put($key, $ctx.args.input.get($key)))
#end

{
  "version": "2018-05-29",
  "operation": "PutItem",
  "key": {
    "PK": $util.dynamodb.toDynamoDBJson($ctx.args.input.PK)
  },
  "attributeValues" : $util.dynamodb.toMapValuesJson($baseobject)
}

レスポンスマッピングテンプレートは下記の通りです。

#if($ctx.error)
    $util.error($ctx.error.message, $ctx.error.type)
#end
$util.toJson($ctx.result)

4-2. 履歴登録ファンクションの作成

createHistory というファンクション名で作成しました。

主要な処理としましては、

  1. exp というタイムスタンプの変数を作成し、 expdate というTTL項目に設定する。
  2. レンジキーに createMain で作成されたデータの createDate を設定する。
  3. 上記の項目を含めた historyobject を生成し、 attributeValues に設定する。

などが挙げられます。

リクエストマッピングテンプレートは下記の通りです。

#set($pk = $ctx.prev.result.PK)

#set($history = 24 * 3600 * 1000)
#set($timestamp = $ctx.prev.result.createdDate)
##TTL must be in seconds for DDB to evict items.
#set($exp = ($history + $timestamp)/1000)


#set($historyobject = {
    'PK': $pk,
    'SK': $timestamp,
    'expdate': $exp
})
#foreach($key in $ctx.prev.result.keySet())
    $util.qr($historyobject.put($key, $ctx.prev.result.get($key)))
#end


{
    "version" : "2018-05-29",
    "operation" : "PutItem",
    "key" : {
        "PK" : { "S" : "$pk" },
        "SK" : { "N" : "$timestamp" }
    },
    "attributeValues" : $util.dynamodb.toMapValuesJson($historyobject),
}

レスポンスマッピングテンプレートは下記の通りです。

#if($ctx.error)
    $util.error($ctx.error.message, $ctx.error.errorType)
#end
$util.toJson($ctx.result)

5. パイプラインリソルバへの割り当て

作成したファンクションをパイプラインリソルバへ割り当てます。

6. 動作確認

6-1. 商品を登録する。

チョコレートの在庫を1,000で登録してみます。
下記のMutationを実行しました。

mutation createPost($createpostinput: CreatePostInput!) {
  createPost(input: $createpostinput) {
    PK
  }
}

登録するデータは下記の通りです。

{
  "createpostinput": {
    "PK": "従業員コード01-商品在庫管理テーブル-ID01",
    "ProductName": "チョコレート",
    "NumberOfStock": 1000
  }
}

登録したデータを見てみましょう。

チョコレートの在庫数が1,000で登録されていることが確認できました。

次に、履歴データを見てみましょう。

レンジキーにタイムスタンプが設定された履歴データが登録されています。

6-2. 在庫の再登録

在庫を350に変更して再登録してみます。

{
  "createpostinput": {
    "PK": "従業員コード01-商品在庫管理テーブル-ID01",
    "ProductName": "チョコレート",
    "NumberOfStock": 350
  }
}

データはどのようになっているでしょうか。
まずは、在庫マスタです。

チョコレートの在庫が350に変更されていることが確認できます。

次に、履歴データを見てみましょう。

新しいレンジキーのタイムスタンプで、
履歴データが作成されていることを確認できました。

また、TTLを指定していますので履歴データは自動的に削除されます。

総括

ここまでの履歴データ管理は、
全てAppSync単体の機能で構築することが出来ています。
AppSyncは発想次第で様々な利活用の方法があり、とても楽しいですね!
GraphQL ✖︎︎ AWS AppSync ✖︎ データソースの組み合わせは、
次世代のデータAPIを彷彿とさせます!