こんにちは。Nulab Appsチームの小松です。
本ブログでは、バッチ処理の実装を通じて得た知見について共有したいと思います。
目次
TL;DR
- Spring BootとSpring Batchを使用したバッチ処理の実装
 - AWSのマネージドサービスを使用したデータ運搬
 
要件
実現したいことは、Webアプリケーションで管理しているデータをSalesforceへ同期することです。
Salesforceへの同期のタイミングは、アプリケーションでのデータ変更に連動して行われることが望まれており、ある程度のリアルタイム性が求められていました。
Salesforce上のモジュール間には関連を定義することができ、今回は、その関連を多用する仕様となっていました。そのため、同期処理時に更新したSalesforceのオブジェクトのID管理やその紐付けなど、独自のロジックの実装が必要でした。
多くのデータを1日何度も同期する必要があったため、Salesforceへのデータ同期方法には、1リクエストで大量のデータ更新を行えるBulk API 2.0 [1] を採用しました。
Bulk API 2.0のリクエストはCSV形式で、Salesforce上の各モジュールごとに対し、1行目にモジュールの項目名、2行目以降に同期対象のデータを記載する仕様となっています。
同期処理の結果(成功、失敗等)についても、CSVの形式でレスポンスがSalesforceから返されます。
要件の概要図が図1です。図中の赤部分が、今回新たに実装する箇所です。
同期処理
上述した通り、この同期処理では、Salesforce上のIDの管理や、モジュール間の関連付けを行うための独自のロジックを実装する必要がありました。また、Bulk API 2.0のリクエストとレスポンスの形式がCSVなので、その生成やハンドリングを容易に行えるライブラリが求められていました。
そこで今回は、様々な種類のIO処理を容易に実現できるSpring Batch [2]を、同期処理の実装に使用しました。 普段の業務で書き慣れているSpringの作法やKotlinで実装ができることも理由の一つでした。
[2] https://spring.io/projects/spring-batch
また、運用の負荷を極力下げる目的で、この同期処理はコンテナ上で実行することにしました。その実行基盤には、導入の容易性から、Fargate を選択しました。Cloudwatch を併用すると、コンテナの起動タイミングを指定できることも決め手の一つでした。
Spring BootとSpring Batchを使用したバッチ処理の実装
コンテナ化と相性が良いSpring Bootの使用を前提とし、その依存関係の一つにSpring Batchを入れることにしました。
Spring Batchとは
Spring Batchの概要について説明します。
図2. Spring Batch概要 (https://docs.spring.io/spring-batch/docs/4.2.4.RELEASE/reference/html/domain.html#domainLanguageOfBatch より引用)
図2は、Spring Batchの概要図です。Spring Batchの主要な概念の関連が示されてます。以下、この図に則って用語の説明を行います。以下、ジョブ実装の例はKotlinを使用しています。
- JobLauncher
- 文字通り、ジョブの実行を司る役割を担います。
 - デフォルトでは、SimpleJobLauncherが使用されます。
- code1に示すrunメソッドからジョブを実行します。
 - 引数jobParametersは、ジョブの実行中参照可能なパラメータ群です。
 
 
 
public JobExecution run(final Job job, final JobParameters jobParameters)
- Job
- ジョブ全体に関する設定の管理を担います。
 - ジョブは一つ以上のStepから構成されます。
 - Jobが管理するのは、Stepの実行順序や実行方法です。
 - 設定は、JobBuilderFactoryを介して行います。
- Code2の例では、”testjob”という名前を持ち、step1, step2の順で処理を実行するジョブ設定となります。
 
 
 
fun job(): Job {
        return jobBuilderFactory
                .get("testjob") // ジョブ名
                .start(step1())
                .next(step2())
                .build()
}
- Step
- Stepに関する設定の管理を担います。
 - Stepは、以下の2つの種類に大別されます。
- Tasklet Step
- 一つのトランザクション内で行う一括処理。
 
 - Chunk Based Step
- チャンク単位で行う処理。チャンク単位でトランザクションが発生します。
 - 各処理は、最大以下の3つのパートから構成されます。
- ItemReader
- 各種データソースからの読み込み処理を担います。
 
 - ItemProcessor
- 中間処理を担います。
 - ItemReaderで読み込んだItemに対し、フィルタリング処理やマッピング処理を行います。
 
 - ItemWriter
- ItemReaderまたはItemProcessorで処理されたItemを各種データソースへ書き込む処理を担います。
 
 
 - ItemReader
 
 
 - Tasklet Step
 - 設定は、StepBuilderFactoryを介して行います。
- Code3は、chunk basedステップの設定例です。
- code3内のchunk<Request,Result>では、入出力の型をそれぞれ指定しています。ここでは入力の型はRequestで、出力の型はResultです。
- 型Requestは、ItemReaderによって、任意のデータソースから読み込んだデータがマッピングされる型です。
 - 型Resultは、任意のデータソースへの書き出しのために、ItemWriterへ渡される型です。
 
 - code3の設定例のStepでは、testReaderを使って読み込みを行い、testProcessorを使い中間処理をした後、testWriterで書き出しを行うことになります。
 
 - code3内のchunk<Request,Result>では、入出力の型をそれぞれ指定しています。ここでは入力の型はRequestで、出力の型はResultです。
 
 - Code3は、chunk basedステップの設定例です。
 
 
fun chunkBasedStep(): Step {
        return stepBuilderFactory
                .get("testChunkBasedStep") // ステップ名
                .chunk<Request,Result>(10)
                .reader(testReader())
                .processor(testProcessor()) 
                .writer(testWriter()) 
                .build()
}
- 
- 
- Code4は、taskletステップの設定例です。
- Tasklet型の引数をStepBuilderFactoryへ渡します
 
 
 - Code4は、taskletステップの設定例です。
 
 - 
 
fun taskletStep() : Step {
        return stepBuilderFactory
                .get("testTaskletStep") // ステップ名
                .tasklet(testTasklet())
                .build()
 }
- JobRepository
- ジョブの実行状態や各ステップの状態といったメタデータの記録や更新を担います。
 - デフォルトでは、SimpleJobRepositoryが使用され、各メタデータは、指定されたRDBへ記録されます。
 
 
Spring Batchを用いた同期処理のジョブフロー
以上を踏まえ、今回実装する同期処理をspring batchのジョブフローとして整理したのが図4です。
図4. Spring Batchを用いた同期処理のジョブフロー
上述したとおり、Jobは一つ以上のステップから構成されます。加えて、一つのジョブ内で、chunk-basedステップとtaskletステップは、混合させることが可能です。
SalesforceのID等の永続化が必要なデータは、RDS (MySQL)に格納するようにしました。また、業務アプリケーションからの同期リクエストも同様にRDS (MySQL)へ格納することにしました。これは、RDS関連のItemReader/ItemWriterの実装とその使用例が豊富だったためです。
図4のジョブフローは、以下の前処理、本処理、後処理の3つのステップに分かれています。
- 「同期リクエストをRDBから読み込み、Salesforceのリクエスト用のcsvに出力する」前処理
 - 「csvを読み込み、Bulk API 2.0を用いて同期処理を行い、その結果をcsvに出力する」本処理
 - 「csvから同期処理結果を読み込み、SalesforceのID等をRDSに保存する」後処理
 
以下、実装上の要点を重点的に取り上げ順に説明します。
「同期リクエストをRDBから読み込み、csvに出力する」前処理
前処理は、Chunk Based Stepとして定義します。
Chunk Based Stepを使用する場合は、入出力のデータソースを決め、その入出力間でデータがそれぞれどういう型にマッピングされるか定義する必要があります。
今回は、入力ソースがRDBで、出力ソースがFlat File (CSV)です。
また、リクエストのフィルタリング処理や加工処理を行うため、中間処理も必要となります。
RDBから一定数のレコードを読み込む必要があるため、ItemReaderには、JdbcPagingItemReaderを使用します。
各種必要な設定値は、JdbcPagingItemReaderBuilderを通じて設定します。ここで指定する型は、RDBから読み込んだレコードがマッピングされる型です。この例では、Request型としています。
code5は、JdbcPagingItemReaderの設定例です。
PagingQueryProviderは、データ読み込みの際に使用するクエリを組み立てるクラスです。
JdbcPagingItemReaderBuilderで設定している各種値は、pageSize以外は、特徴的なものはないので、説明を省略します。
PageSizeは、1クエリで取得するレコード数を指定します。したがって、この例では、最大100行のレコードを取得します。
@Bean
fun testItemReader(dataSource: DataSource?,
requestQueryProvider: PagingQueryProvider?,
@Value("#{jobParameters['to']}") to:Long?) : JdbcPagingItemReader<Request> {
        val parameterValues: MutableMap<String, Any> = mutableMapOf()
        parameterValues["to"] = Timestamp(to!!)
        return JdbcPagingItemReaderBuilder<Request>()
                .name("testItemReader")
                .dataSource(dataSource!!)
                .queryProvider(requestQueryProvider!!)
                .parameterValues(parameterValues)
                // The number of records to request per page/query.
                .pageSize(100)
                .rowMapper(RequestRowMapper())
                .build()
    }
@Bean
fun requestQueryProvider(dataSource: DataSource?) :
        SqlPagingQueryProviderFactoryBean {
        return SqlPagingQueryProviderFactoryBean()
            .apply {
                setDataSource(dataSource!!)
                setSelectClause("select *")
                setFromClause("from request")
                setWhereClause("where created < :to")
                setSortKeys(mapOf("id" to Order.DESCENDING))
            }
    }
中間処理のフィルタリング処理やマッピング処理では、独自のロジックが必要となるので、ItemProcessorとして、ItemProcessorAdapterを使用し、サービスクラスへ実処理を移譲します。
code6では、TestServiceクラスのtestMethodへ実処理を移譲しています。testMethodの引数はRequest型となり、戻り値はResult型となります。
また、フィルタリング処理の結果、後続の書き込み処理から該当のItemを除外したい場合は、移譲先のメソッドからnullを返すことで実現できます。そのため、testMethodからの戻り値はnullableな型を指定しています。
@Bean
    fun salesforceAccountMappingProcessor(
            testService: TestService?)
            : ItemProcessorAdapter<Request, Result> {
        return ItemProcessorAdapter<Request, Result>()
                .apply{
                    setTargetObject(testService!!)
                    setTargetMethod("testMethod")
                }
    }
interface TestService {
    fun testMethod(record: Request): Result?
}
最後に、ItemWriterの処理です。今回は、CSV形式のFlatFileとして書き出したいため、FlatFileItemWriterを使用します。指定する型はResult型です。これは、前段のItemProcessorからの戻り値となります。FlatFileItemWriterBuilderを介して設定を行います。
code7は、FlatFileItemWriterの設定例です。
出力先のファイルは、jobParametersから取得したoutputFileです。
FlatFileItemWriterにおける特徴的な概念はLineAggregatorです。
LineAggregatorの役割は、FlatFileItemWriterへ渡されたItem(この例ではResponse型)から、ファイル出力するプロパティを抽出し(FieldExtractor)、それらを行としてどのように表現するか指定(setFormat)することです。
code7では、インターフェースLineAggregatorの実装であるFormatterLineAggregatorを使用しています。
headerCallBackは文字通り、ファイルのヘッダへの書き出しを担います。
appendも文字通りで、すでに同名の出力対象のファイルが存在している場合は、そのファイルへ追記するか否かのフラグです。この例ではtrueとなっているため、ファイルがすでに存在している場合は、チャンク単位でファイルへの追記が行われます。
@Bean
fun testWriter(
  @Lazy @Value("#{jobParameters['outputFile']}") outputFile: Resource?) : FlatFileItemWriter<Result> {
        val fieldExtractor =
                BeanWrapperFieldExtractor<Result>()
                        .apply {
                            setNames(["propertyA", "propertyB"]))
                            afterPropertiesSet()
                        }
        val lineAggregator = FormatterLineAggregator<Result>()
                .apply {
                    setFormat("%s, %s")
                    setFieldExtractor(fieldExtractor)
                }
        return FlatFileItemWriterBuilder<Result>()
                .name("testItemWriter")
                .resource(outputFile!!)
                .lineAggregator(lineAggregator)
                .headerCallback(TestHeaderCallBack(["headerA","headerB"]))
                .append(true)
                .build()
}
class TestHeaderCallback(val header:Array<String>) : FlatFileHeaderCallback {
    override fun writeHeader(writer: Writer) {
        writer.write(header.joinToString(","))
    }
}
以上のItemReader, ItemProcessor, ItemWriterを用いてChunk based stepとして定義した例がcode8です。
code5-7で見てきたように、この前処理では、RDBから読み込んだレコードがRequest型にマッピングされた後、中間処理でResult型にマッピングされ、最終的にCSV形式でFlatfileへ書き出されます。
そのため、code8内のchunk句では、ItemReaderからItemWriter間でマッピングされる型をそれぞれ、Request, Resultと指定しています。
また、その後に続く整数値は、コミットを行うレコード数の単位を表します。
code5で指定したpageSizeは100でした。そのため、code8のStep設定例だと、1つのトランザクションでStep処理が完結します。ItemReaderは最大100レコードを読み込み、このStepでは、100個までのレコードを一つのチャンク(トランザクション)として扱うからです。
faultTolerant()を設定すると、ステップ単位でのリトライや失敗時のスキップの設定を追加することができます。skipPolicyでは、skipする例外を任意に指定することができます。
fun preStep(): Step {
        return stepBuilderFactory
                .get("preStep")
                .chunk<Request,Result>(100) // IOの型の指定/コミットの間隔を指定
                .reader(testReader())
                .processor(testProcessor())
                .writer(testWriter())
                .faultTolerant() // リトライやスキップの追加設定
                .skipPolicy(customeSkipPolicy)
                .build()
}
「csvを読み込み、Bulk API 2.0により同期処理を行い、その結果をcsvに出力する」本処理
本処理では、前処理で出力したCSVファイルの内容を読み込み、Bulk API 2.0を使用し同期処理を行います。同期処理の結果がCSVの形式でSalesforceから返されるので、それをそのままCSVファイルとして出力します。
本処理は、チャンク単位の処理をする必要がないので、Tasklet Stepとして定義します。
Taskletステップで行う処理は、Taskletインターフェースのexecuteメソッドを実装します。
code10は、Taskletの実装およびStepの設定です。
Taskletの実装では、executeメソッドで処理が正常終了した場合には、RepeatStatus.FINISHEDを返します。これにより、ステップが正常終了したことをジョブに伝えます。
Stepの設定では、独自に定義したTaskletをstepBuilderFactoryに渡しています。
class TestTasklet : Tasklet, StepExecutionListener {
  override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus {
	...
	return RepeatStatus.FINISHED
}
@Bean
    fun synchronizationStep() : Step {
        return  stepBuilderFactory
                .get("synchronizationStep")
                .tasklet(TestTasklet())
                .build()
    }
TestTaskletで実装しているStepExecutionListenerには、beforeStep, afterStepメソッドがそれぞれ定義されており、Tasklet.executeの前後で行いたい処理等あればこちらに実装することが可能です。
public interface StepExecutionListener extends StepListener {
    void beforeStep(StepExecution var1);
    @Nullable
    ExitStatus afterStep(StepExecution var1);
}
「csvから同期処理結果を読み込み、SalesforceのID等をRDSに保存する」後処理
後処理は、前処理と同様Chunk Based Stepで定義します。
入出力ソースは、前処理とは逆で、入力ソースがFlat File (CSV)で、出力ソースがRDBとなります。
また、後処理でもフィルタリング処理や加工処理を行うため、ItemProcessorが必要となります。
CSVから読み込んだItemがマッピングされる型はRequest2で、RDBへの書き込みの際に渡される型はResult2とします。
ItemReaderには、FlatFileItemReaderを使用します。
Flat file (CSV)を型にマッピングするために、LineTokenizerとFieldSetMapperを指定します。
LineTokenizerが担うのは、ファイル内の各行をコンテキストに応じてパースし、FieldSetにマッピングすることです。FieldSetは、パースされた各文字列がどのJavaのネイティブ型に変換されるかを管理します。
一方FieldSetMapperは、LineTokenizerから返されるFieldSetを型にマッピングする役割を担います。
code15で、FlatFileItemReaderの設定を行います。
CSVファイルのheader部分は処理をする必要ないので、マッピング処理からスキップします。
linesToSkip(1)でファイルの1行目をスキップすることを指定し、skippedLinesCallbackで、skip該当行(i.e. 1行目)のコールバック処理を指定します。ここでは、CSVの先頭行に対しては何も処理をする必要がないので、空処理で実装しています。
@Bean
fun testReader2(
  @Lazy @Value("#{jobParameters['inputFile']}") inputFile: Resource)
    : FlatFileItemReader<Request2> {
        return FlatFileItemReaderBuilder<Request2>()
                .name("testReader2")
                .lineTokenizer(testTokenizer())
                .fieldSetMapper(testFieldSetMapper())
                .resource(inputFile!!)
                .linesToSkip(1) // to skip header
                .skippedLinesCallback(SkippedLineCallback())
                .build()
    }
class TestTokenizer() : LineTokenizer {
    private val fieldSetFactory = DefaultFieldSetFactory()
    override fun tokenize(csvLine: String?): FieldSet {
        if (csvLine == null) throw Exception()
        val fields = csvLine.split(",").toTypedArray()
        val names = arrayOf("result1", "result2")
        return  fieldSetFactory.create(fields, names)
    }
}
data class Request2 (val property3: String, val property4: Int)
class TestFieldSetMapper : FieldSetMapper<Request2> {
    override fun mapFieldSet(fs: FieldSet): Request2 {
        return Request2(
                result1 = fs.readString("result1"),
                result2 = fs.readInt("result2"))
    }
}
class SkippedLineCallback : LineCallbackHandler {
    // do nothing
    override fun handleLine(line: String) {
    }
}
中間処理は、前処理と同様に、ItemProcessorAdapterを使用し、サービスクラスへ実処理を移譲します。実装方法は同じなので省略します。
RDBへの書き出しを行うため、ItemWriterは、JdbcBatchItemWriterを使用します。
JdbcBatchItemWriterBuilderを介して設定を行います。
@Bean
fun testJdbcItemWriter(dataSource: DataSource?):  JdbcBatchItemWriter<Result2> {
    return JdbcBatchItemWriterBuilder<Result2>()
      .dataSource(dataSource!!)
      .sql("INSERT INTO result_table " +
            "(external_id, result) " +
             "VALUES (?, ?)")
      .itemPreparedStatementSetter(TestItemPreparedStatementSetter())
      .build()
    }
class TestItemPreparedStatementSetter :
        ItemPreparedStatementSetter<Result2> {
    @Throws(SQLException::class)
    override fun setValues(item: Result2, ps: PreparedStatement) {
            ps.setString(1, item.result1)
            ps.setString(2, item.result2)
        }
    }
}
クラス名にBatchがついているのは、指定されたチャンク数のItemの書き込みを一括で担うためです。code17は、JdbcBatchItemWriter.writeのメソッド内の一部抜粋です。チャンク分のitemをため込んだ後、クエリを実行していることがわかります。
while(var2.hasNext()) {
    T item = var2.next();
    JdbcBatchItemWriter.this.itemPreparedStatementSetter.setValues(item, ps);
    ps.addBatch();
}
return ps.executeBatch();
複数のテーブルに対してクエリを実行したいような場合は、CompositeItemWriterを使用します。設定方法はcode18の通りで、code16で設定したようなJdbcBatchItemWriterをリスト形式で、delegateに指定します。こうすることで、書き込み処理が、このリストに指定された順で実行されるようになります。
@Bean
fun testCompositeItemWriter() :
  CompositeItemWriter<Result2> {
  return CompositeItemWriterBuilder<Result2>()
    .delegates(
      listOf(testJdbcItemWriter(null),
        testJdbcItemWriter2(null)))
    .build()
}
code19が後処理のstep設定例です。code15, code18で設定したreader, writerを指定しています。
fun postStep(): Step {
        return stepBuilderFactory
                .get("postStep")
                .chunk<Request,Result>(100)
                .reader(testReader2())
                .processor(testProcessor2())
                .writer(testCompositeWriter()) // compositeItemWriterを指定
                .faultTolerant()
                .skipPolicy(customeSkipPolicy)
                .build()
}
ジョブ設定
以上の前処理、本処理、後処理の各ステップを実行するジョブ処理の設定例がcode20です。
今回のジョブでは、それぞれのステップ処理をシーケンシャルに実行したいので、startとnextを使用し順にステップを指定しています
@Bean
    fun job(): Job {
        return this.jobBuilderFactory
                .get("salesforceSynchronizationJob")
                .incrementer(RunIdIncrementer())
                .start(preStep)
                .next(synchronizationStep)
                .next(postStep)
                .build()
AWSのマネージドサービス群でデータ運搬
最後に、アプリケーションからSalesforceの同期リクエストを管理するテーブルへリクエストデータを運搬するAWSの構成について説明します。
構成としては図8に示す通りです。
アプリケーションからの同期リクエストは、API Gatewayで受け付けます。
同期対象のSalesforceのモジュールは複数あります。また、更新、削除、作成と操作が異なる場合には必要となるリクエストも変わります。
そのため、各モジュール×各操作の数のエンドポイントをAPI Gatewayに作成しました。
各エンドポイントにPostされたリクエストデータは、API GatewayのAWS Proxy を使用し、後続のSQSに渡します。SQSとAPIのエンドポイントは対の関係となっています。
SQSには標準キューを使用しました。リクエストの順序性は、アプリケーションで担保し、SQSではハンドリングできる流量を重視しました。
これらのSQSにLambdaを連動させました。Amazon Aurora ServerlessのData APIを使用することで、Lambdaからテーブルに各リクエストをインサートしています。
(実装時は、LambdaのRDS proxyがまだ存在しておらず、その当時、最も容易にRDSへのインサートが行えるのがこの方法だと判断しました。)
Data APIは、SDKが提供されており、ドキュメンテーションに記載の通り実装することで、データベースへの各種操作を実施することができました。
API Gatewayのエンドポイントごとにリクエストテーブルが存在するため、各Lambdaの環境変数でインサート先のテーブルを指定するようにしました。
バッチ処理の実行基盤には、Fargateを使用しました。Cloud Watch Event を使用して、ECS Taskを起動し、バッチを定期実行するようにしています。
結び
Spring BootとSpring Batchを使用したバッチの実装方法とAWSのサービスを使ったデータ運搬方法について本ブログでは説明させてもらいました。
まず、Spring Batchを使用したバッチ実装に関する所感です。
Spring Batch自体は成熟したライブラリで、IOのデフォルト実装がかなり充実しています。なので、まずはそれらのデフォルト実装に則ってジョブの実装ができないか検討した方がよいと思いました。Spring Batchが用意しているBuilder群を介した設定を行うことを心がけました。
Spring Batchが推奨する実装方法を知る上で、以下の書籍を参照しました。
The Definitive Guide to Spring Batch – Modern Finite Batch Processing in the Cloud
https://www.apress.com/gp/book/9781484237236
公式のドキュメンテーションも充実してるのですが、こちらの本はもう少し実践的で、何より開発リーダーの方が執筆されているので、実装の裏側にはある理念みたいなものも垣間見ることができてよかったです。
次にAWSを駆使したデータ運搬に関しての所感です。
基本的には、ドキュメンテーションを読んで、公式に提供されている機能を正しく使うことで、望んでいるデータ運搬を実現することができました。
複数のAWSサービスを経由した構成となっているので、各サービスで同期リクエストの状態を記録しておく必要がありました。大方のサービスでは、容易にCloud Watch Logsにログを書き出せるようになっているので、調査の際に必要となる情報をログに出しておきさえすれば、後から追えるようになっているのはよかったです。




