AppSyncのパイプラインリソルバーを利用して履歴データを管理する

AWS AppSyncのパイプラインリソルバーを利用して、
履歴データ管理を構築してみました。

パイプラインリソルバーとは?

パイプラインリゾルバーは、データソースに対してオペレーションを
順番に実行する機能を提供します。
複数のデータソースに対して順番に実行することも可能です。

パイプラインリソルバーを利用した履歴データ管理の構築

今回は、従業員が商品の在庫数を管理するシチュエーションを想定して
在庫数の変動を確認できる履歴データのテーブルを
パイプラインリソルバーを利用して構築していきます。

1. データソースの作成

データソースにはDynamoDBを利用します。下記のテーブルを作成します。

1-1. 在庫マスタテーブル

在庫マスタは文字列のハッシュキーを持つシンプルなテーブルを作成しました。

1-2. 履歴テーブル

履歴は作成日をレンジキーとして保持し、時系列データとします。

TTL属性を指定し、履歴データは自動的に削除されるようにしました。

2. AppSync APIの作成

在庫マスタテーブルをインポートしてAppSync APIを作成します。

自動的にスキーマが作成されますので、
商品名(ProductName)や在庫数(NumberOfStock)の属性を追加します。
下記の通りです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
input CreatePostInput {
PK: String!
ProductName: String
NumberOfStock: Int
}

input DeletePostInput {
PK: String!
}

type Mutation {
createPost(input: CreatePostInput!): Post
updatePost(input: UpdatePostInput!): Post
deletePost(input: DeletePostInput!): Post
}

type Post {
PK: String!
}

type PostConnection {
items: [Post]
nextToken: String
}

type Query {
getPost(PK: String!): Post
listPosts(filter: TablePostFilterInput, limit: Int, nextToken: String): PostConnection
}

type Subscription {
onCreatePost(PK: String): Post
@aws_subscribe(mutations: ["createPost"])
onUpdatePost(PK: String): Post
@aws_subscribe(mutations: ["updatePost"])
onDeletePost(PK: String): Post
@aws_subscribe(mutations: ["deletePost"])
}

input TableBooleanFilterInput {
ne: Boolean
eq: Boolean
}

input TableFloatFilterInput {
ne: Float
eq: Float
le: Float
lt: Float
ge: Float
gt: Float
contains: Float
notContains: Float
between: [Float]
}

input TableIDFilterInput {
ne: ID
eq: ID
le: ID
lt: ID
ge: ID
gt: ID
contains: ID
notContains: ID
between: [ID]
beginsWith: ID
}

input TableIntFilterInput {
ne: Int
eq: Int
le: Int
lt: Int
ge: Int
gt: Int
contains: Int
notContains: Int
between: [Int]
}

input TablePostFilterInput {
PK: TableStringFilterInput
}

input TableStringFilterInput {
ne: String
eq: String
le: String
lt: String
ge: String
gt: String
contains: String
notContains: String
between: [String]
beginsWith: String
}

input UpdatePostInput {
PK: String!
}

3. パイプラインリソルバーへの変換 

商品在庫を登録する時のリソルバーを、パイプラインリソルバーに変換します。
該当するリソルバーの Convert to pipeline resolver リンクを
クリックすることで簡単にパイプラインリソルバーに変換できます。

4. ファンクションの作成

パイプラインリソルバーに割り当てる複数のファンクションを作成します。

4-1. 在庫マスタ登録ファンクションの作成

createMain というファンクション名で作成しました。
主要な処理としましては、

  1. タイムスタンプを createdDate という項目で変数として定義する。
  2. createdDate と全てのKeyを含めた baseobject を作成し、 attributeValues に設定する。

などが挙げられます。

リクエストマッピングテンプレートは下記の通りです。

1
2
3
4
5
6
7
8
9
10
11
12
13
#set($baseobject = {'createdDate': $util.time.nowEpochMilliSeconds()})
#foreach($key in $ctx.args.input.keySet())
$util.qr($baseobject.put($key, $ctx.args.input.get($key)))
#end

{
"version": "2018-05-29",
"operation": "PutItem",
"key": {
"PK": $util.dynamodb.toDynamoDBJson($ctx.args.input.PK)
},
"attributeValues" : $util.dynamodb.toMapValuesJson($baseobject)
}

レスポンスマッピングテンプレートは下記の通りです。

1
2
3
4
#if($ctx.error)
$util.error($ctx.error.message, $ctx.error.type)
#end
$util.toJson($ctx.result)

4-2. 履歴登録ファンクションの作成

createHistory というファンクション名で作成しました。

主要な処理としましては、

  1. exp というタイムスタンプの変数を作成し、 expdate というTTL項目に設定する。
  2. レンジキーに createMain で作成されたデータの createDate を設定する。
  3. 上記の項目を含めた historyobject を生成し、 attributeValues に設定する。

などが挙げられます。

リクエストマッピングテンプレートは下記の通りです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#set($pk = $ctx.prev.result.PK)

#set($history = 24 * 3600 * 1000)
#set($timestamp = $ctx.prev.result.createdDate)
##TTL must be in seconds for DDB to evict items.
#set($exp = ($history + $timestamp)/1000)


#set($historyobject = {
'PK': $pk,
'SK': $timestamp,
'expdate': $exp
})
#foreach($key in $ctx.prev.result.keySet())
$util.qr($historyobject.put($key, $ctx.prev.result.get($key)))
#end


{
"version" : "2018-05-29",
"operation" : "PutItem",
"key" : {
"PK" : { "S" : "$pk" },
"SK" : { "N" : "$timestamp" }
},
"attributeValues" : $util.dynamodb.toMapValuesJson($historyobject),
}

レスポンスマッピングテンプレートは下記の通りです。

1
2
3
4
#if($ctx.error)
$util.error($ctx.error.message, $ctx.error.errorType)
#end
$util.toJson($ctx.result)

5. パイプラインリソルバへの割り当て

作成したファンクションをパイプラインリソルバへ割り当てます。

6. 動作確認

6-1. 商品を登録する。

チョコレートの在庫を1,000で登録してみます。
下記のMutationを実行しました。

1
2
3
4
5
mutation createPost($createpostinput: CreatePostInput!) {
createPost(input: $createpostinput) {
PK
}
}

登録するデータは下記の通りです。

1
2
3
4
5
6
7
{
"createpostinput": {
"PK": "従業員コード01-商品在庫管理テーブル-ID01",
"ProductName": "チョコレート",
"NumberOfStock": 1000
}
}

登録したデータを見てみましょう。

チョコレートの在庫数が1,000で登録されていることが確認できました。

次に、履歴データを見てみましょう。

レンジキーにタイムスタンプが設定された履歴データが登録されています。

6-2. 在庫の再登録

在庫を350に変更して再登録してみます。

1
2
3
4
5
6
7
{
"createpostinput": {
"PK": "従業員コード01-商品在庫管理テーブル-ID01",
"ProductName": "チョコレート",
"NumberOfStock": 350
}
}

データはどのようになっているでしょうか。
まずは、在庫マスタです。

チョコレートの在庫が350に変更されていることが確認できます。

次に、履歴データを見てみましょう。

新しいレンジキーのタイムスタンプで、
履歴データが作成されていることを確認できました。

また、TTLを指定していますので履歴データは自動的に削除されます。

総括

ここまでの履歴データ管理は、
全てAppSync単体の機能で構築することが出来ています。
AppSyncは発想次第で様々な利活用の方法があり、とても楽しいですね!
GraphQL ✖︎︎ AWS AppSync ✖︎ データソースの組み合わせは、
次世代のデータAPIを彷彿とさせます!

このエントリーをはてなブックマークに追加