Spring Batch

Spring Batch Flow

[Spring] 1. Spring Batch의 기본 개념

  • Job > Step > Task 구조

    • Job은 Step을 갖고 있고, Step은 Tasklet 인터페이스를 통해 수행 작업을 갖고 있다.

    • Task는 기본적인 사용자 정의 형식read/process/write (RPW) 형식이 존재한다.

Spring Batch Metadata

[Spring] 1. Spring Batch의 기본 개념

Spring Batch는 DB를 통해 완료/실패 등 상태관리를 한다. 크게 4가지의 상태를 저장한다.

  1. 이전 실행 job history

  2. 실패한 batch와 parameter / 성공한 job

  3. 실행 재개 지점

  4. Job 기준 step 현황과 성공 / 실패 여부

여러 가지 테이블이 DB에 생성되는데, H2 DB 사용시 자동으로 생성되나 그외 DB는 직접 생성해야 한다. DDL 쿼리는 org.springframework.batch.core에 포함되어 있다.

Spring Batch 정리

  • spring-starter-batch dependency 추가

  • task scheduling 대체가 아닌, scheduler와 함께 작동하도록 만들어짐

  • 유즈 케이스

    • 데이터베이스, 파일, 큐로부터 커다란 수의 레코드를 읽어들임

    • 데이터 처리

    • 조작한 데이터를 다시 작성

  • 주기적으로 배치 프로세스를 실행하거나, 병렬적으로 작업을 처리하거나, 메시지 주도적으로 작업을 처리하는 등의 비즈니스 시나리오도 존재한다.

  • 스프링 배치 계층 구조 (Spring Batch Layered Architecture)

  • Application은 모든 배치 작업과 스프링 배치로 작성한 개발자의 코드를 담고 있다.

  • Batch Core는 배치 작업을 실행하고 관리하기 위한 주요 런타임 클래스를 담고 있다. JobLauncher, Job, Step의 구현체를 포함한다.

  • Application과 Core 둘 다 동일한 infrastructure 위에서 작동한다. Infrastructure은 개발자들이 사용할 수 있는 공통 reader (예를 들면 ItemReader)와 writer (예를 들면 ItemWriter), service (예를 들어 RetryTemplate)를 포함한다. 그리고 코어 프레임워크를 포함한다. (코어 프레임워크 자체도 라이브러리이다.)

  • 가이드라인

    • 사용할 데이터를 배치와 가까이 두자. 즉, 처리를 할 곳에 데이터를 가까이 두잔 뜻.

    • I/O를 줄이고 내장 메모리 위에서 작업을 많이 하자.

    • 모든 트랜잭션에 대해서 매번 데이터를 읽지 말고 한번에 읽고 캐시를 해두던지 작업 중인 저장소에 보관해두자.

    • 같은 배치 작업을 두 번 실행하지 말자.

    • 시간이 오래 걸리는 재할당에 시간을 줄이기 위해서 애초에 배치 애플리케이션에 메모리를 적당히 할당해두자.

SpEL (Spring Expression Language)

Spring Expression Language(SpEL) 에 대해

SpEL (Spring Expression Language)런타임에서 객체에 대한 쿼리(query)와 조작(manipulation)을 지원하는 강력한 표현 언어이다.

SpEL 표현식은 # 기호로 시작하며 중괄호로 묶어서 표현한다. #{표현식}

속성 값을 참조할 때는 $ 기호와 중괄호로 묶어서 표현한다. ${property.name}

Spring Batch

배치의 일반적인 시나리오는 읽기 - 처리 - 쓰기로 나누어진다.

배치 관련 객체 관계도

JobStep1:M, StepItemReader, ItemProcessor, ItemWriter1:1 관계를 갖는다.

Job이라는 하나의 큰 일감(Job)에 여러 단계(Step)를 두고, 각 단계를 배치의 기본 흐름대로 구현한다.

Job

Job 은 배치 처리 과정을 하나의 단위로 만들어 표현한 객체이다. 전체 배치 처리에 있어 항상 최상단 계층에 있다. Job 객체는 여러 Step 인스턴스를 포함하는 컨테이너다.

Job 객체를 만드는 빌더는 여러 개가 있다. JobBuilderFactory로 원하는 Job을 만들 수 있다. JobBuilderFactoryget() 메서드로 JobBuilder를 생성하고 이를 응용하면 된다. org.springframework.batch.core.Configuration.annotation.JobBuilderFactory의 내부 코드이다. (길다 길어!)

// org.springframework.batch.core.Configuration.annotation.JobBuilderFactory
public class JobBuilderFactory {
	private JobRepository jobRepository;

	public JobBuilderFactory(JobRepository jobRepository) {
			this.jobRepository = jobRepository;
	}

	public JobBuilder get(String name) {
			JobBuilder builder = new JobBuilder(name).repository(jobRepository);
			return builder;
	}
}

get() 메서드를 호출할 때마다 새로운 JobBuilder 인스턴스를 반환한다. 그리고 매번 생성할 때마다 JobBuilderFactory 를 생성할 때 주입받은 JobRepository 를 사용할 repository로 설정한다. 즉, 해당 JobBuilderFactory 에서 생성되는 모든 JobBuilder 가 동일한 리포지토리를 사용한다.

아래는 JobBuilder 코드 일부이다.

// org.springframework.batch.core.job.builder.JobBuilder

// ...

public SimpleJobBuilder start(Step step) {
	return new SimpleJobBuilder(this).start(step);
}

public JobFlowBuilder start(Flow flow) {
	return new JobFlowBuilder(this).start(flow);
}

public JobFlowBuilder flow(Step step) {
	return new JobFlowBuilder(this).start(step);
}

// ...

공통점은 모두 빌더를 반환한다는 점이다. JobBuilderJob을 직접 생성하는 것이 아닌 별도의 구체적인 빌더를 만들어 반환한다. 이렇게 빌더를 생성하게끔 하는 이유는, 경우에 따라 Job 생성 방법이 다르기 때문이다. 구체적인 빌더를 구현하고 이를 통해 Job 생성이 이루어지게 하는 의도로 파악된다.

빌더를 받아 사용해야 하므로 불편해보이지만, 메서드 체인 방식을 이용하면 구체적인 빌더의 존재를 생각하지 않아도 될 만큼 손쉽게 처리할 수 있다.

메서드를 살펴보면 Job을 생성하기 위한 Step 또는 Flow를 파라미터로 받아 구체적인 빌더를 생성하고 있다. JobStep 또는 Flow 인스턴스의 컨테이너 역할을 하기 때문에 생성하기 전에 인스턴스를 전달받는다.

다음은 Job 생성 예제 코드이다.

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Bean
public Job simpleJob() {
    return jobBuilderFactory.get("simpleJob")  // 'simpleJob' 이라는 이름을 가진 Job을 생성할 수 있는 `JobBuilder` 객체 인스턴스 반환
                .start(simpleStep())  // `simpleStep()`은 간단한 `Step` 인스턴스를 생성해 반환하는 메서드라 가정한다. `start()` 메서드로 인해 생성되는 빌더는 `SimpleJobBuilder`
                .build();  // 'simpleJob'이라는 이름을 가진 `Job`이 생성되어 반환
}

JobInstance

JobInstance 는 배치에서 Job 이 실행될 때 하나의 Job 실행 단위이다. 만약 하루에 한 번씩 배치의 Job 이 실행된다면, 어제와 오늘 실행한 각각의 JobJobInstance 라고 부를 수 있다.

그렇다면 각각의 JobInstance 는 하나의 JobExecution (JobInstance에 대한 한 번의 실행을 나타내는 객체)을 갖고 있을까? 그렇지 않다.

오늘 Job을 실행했는데 실패했다면 다음날 동일한 JobInstance 를 가지고 또 실행한다. Job 실행이 실패하면 JobInstance 가 끝난 것으로 간주하지 않기 때문이다.

그러면 JobInstance어제의 실패한 JobExecution 과 오늘 성공한 JobExecution 두 개를 갖게 된다. 즉, JobExecution을 여러 개 가질 수 있다.

JobExecution

JobExecutionJobInstance에 대한 한 번의 실행을 나타내는 객체이다.

위의 예제를 그대로 가져와 설명하자면, 만약 오늘의 Job이 실패해 내일 다시 동일한 Job 을 실행하면 오늘, 내일의 실행 모두 같은 JobInstance 를 사용할 것이다. 단, 오늘, 내일의 실행은 각기 다른 JobExecution 을 생성한다.

JobExecution 인터페이스를 보면 Job 실행에 대한 정보를 담고 있는 도메인 객체라는 것을 알 수 있다. JobExecutionJobInstance, 배치 실행 상태, 시작 시간, 끝난 시간, 실패했을 때의 메시지 등의 정보를 담고 있다.

다음은 JobExecution 내부의 코드이다.

// org.springframework.batch.core.JobExecution
public class JobExecution extends Entity {

	private final JobParameters jobParameters;
	private JobInstance jobInstance;
	private volatile Collection<StepExecution> stepExecutions = Collections.synchronizedSet(new LinkedHashSet<>());
	private volatile BatchStatus status = BatchStatus.STARTING;
	private volatile Date startTime = null;
	private volatile Date createTime = new Date(System.currentTimeMillis());
	private volatile Date endTime = null;
	private volatile Date lastUpdated = null;
	private volatile ExitStatus exitStatus = ExitStatus.UNKNOWN;
	private volatile ExecutionContext executionContext = new ExecutionContext();
	private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<>();
	private final String jobConfigurationName;

		// ...
}
  • jobParameters : Job 실행에 필요한 매개변수 데이터.

  • jobInstance : Job 실행의 단위가 되는 객체.

  • stepExecutions : StepExecution을 여러 개 가질 수 있는 Collection 타입.

  • status : Job실행 상태. (COMPLETED, STARTING, STARTED, STOPPING, STOPPED, FAILED, ABANDONED, UNKNOWN 등이 있다. default는 STARTING)

  • startTime : Job 이 실행된 시간. null 이면 시작하지 않았다는 뜻.

  • createTime : JobExecution 이 생성된 시간.

  • endTime : JobExecution이 끝난 시간.

  • lastUpdated : 마지막으로 수정된 시간.

  • exitStatus : Job 실행 결과에 대한 상태. (UNKNOWN, EXECUTING, COMPLETED, NOOP, FAILED, STOPPED 등이 있다. default는 UNKNOWN)

  • executionContext : Job 실행 사이에 유지해야 하는 사용자 데이터가 들어 있다.

  • failureExceptions : Job 실행 중 발생한 예외List 에 넣어둔다.

  • jobConfigurationName : Job 설정 이름.

JobParameters

JobParametersJob 이 실행될 때 필요한 파라미터들을 Map 타입으로 저장하는 객체이다.

JobParametersJobInstance 를 구분하는 기준이 되기도 한다. 예를 들어 Job 하나를 생성할 때, 시작 시간 등의 정보를 파라미터로 해서 하나의 JobInstance 를 생성한다.

즉, JobInstanceJobParameters 는 1:1 관계이다. 파라미터의 타입으로는 String , Long , Date , Double 을 사용할 수 있다.

Step

Step은 실질적인 배치 처리를 정의하고 제어하는 데 필요한 모든 정보가 들어 있는 도메인 객체이다. Job 을 처리하는 실질적인 단위로 쓰인다.

모든 Job 에는 1개 이상의 Step이 있어야 한다.

StepExecution

JobJobExecution 이라는 Job 실행 정보가 있다면, Step에는 StepExecution 이라는 Step 실행 정보를 담는 객체가 있다. 각각의 Step 이 실행될 때마다 StepExecution 이 생성된다.

다음은 StepExecution 클래스이다.

public class StepExecution extends Entity {

	private final JobExecution jobExecution;
	private final String stepName;
	private volatile BatchStatus status = BatchStatus.STARTING;
	private volatile long readCount = 0;
	private volatile long writeCount = 0;
	private volatile long commitCount = 0;
	private volatile long rollbackCount = 0;
	private volatile long readSkipCount = 0;
	private volatile long processSkipCount = 0;
	private volatile long writeSkipCount = 0;
	private volatile Date startTime = null;
	private volatile Date createTime = new Date(System.currentTimeMillis());
	private volatile Date endTime = null;
	private volatile Date lastUpdated = null;
	private volatile ExecutionContext executionContext = new ExecutionContext();
	private volatile ExitStatus exitStatus = ExitStatus.EXECUTING;
	private volatile boolean terminateOnly;
	private volatile long filterCount;
	private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<>();

	// ...
}
  • jobExecution : 현재의 JobExecution 정보.

  • stepName : Step의 이름.

  • status : Step실행 상태. (COMPLETED, STARTING, STARTED, STOPPING, STOPPED, FAILED, ABANDONED, UNKNOWN 등이 있다. default는 STARTING.)

  • readCount : 성공적으로 읽은 레코드 수.

  • writeCount : 성공적으로 쓴 레코드 수.

  • commitCount : Step의 실행에 대해 커밋된 트랜잭션 수.

  • rollbackCount : Step의 실행에 대해 롤백된 트랜잭션 수.

  • readSkipCount : 읽기에 실패해 건너뛴 레코드 수.

  • processSkipCount : 프로세스가 실패해 건너뛴 레코드 수.

  • writeSkipCount : 쓰기에 실패해 건너뛴 레코드 수.

  • startTime : Step이 실행된 시간. null이면 시작하지 않았다는 뜻.

  • endTime : Step실행 성공 여부와 관련 없이 Step이 끝난 시간.

  • lastUpdated : 마지막으로 수정된 시간.

  • executionContext : Step 실행 사이에 유지해야 하는 사용자 데이터가 들어 있다.

  • exitStatus : Step 실행 결과에 대한 상태. (UNKNOWN, EXECUTING, COMPLETED, NOOP, FAILED, STOPPED 등이 있다. default는 UNKNOWN.)

  • terminateOnly : Job 실행 중지 여부.

  • filterCount : 실행에서 필터링된 레코드 수.

  • failureExceptions : Step 실행 중 발생한 예외를 List 타입으로 저장한다.

JobRepository

JobRepository 는 배치 처리 정보를 담고 있는 메커니즘이다. 어떤 Job이 실행되었으며, 몇 번 실행되었고, 언제 끝났는지 등 배치 처리에 대한 메타데이터를 저장한다.

예를 들어 Job 하나가 실행되면 JobRepository 에서는 배치 실행에 관련된 정보를 담고 있는 도메인인 JobExecution 을 생성한다.

JobRepositoryStep 의 실행 정보를 담고 있는 StepExecution 도 저장소에 저장하며, 전체 메타데이터를 저장 및 관리하는 역할을 한다.

JobLauncher

JobLauncherJob , JobParameters 와 함께 배치를 실행하는 인터페이스이다. 인터페이스는 run() 하나이다.

// org.springframework.batch.core.launch.JobLauncher
public interface JobLauncher {
	public JobExecution run(Job job, JobParameters jobParameters) throws ...
}

매개변수로 JobJobParameters를 받아 JobExecution을 반환한다. 매개변수가 이전과 동일하면서 이전에 JobExecution이 중단된 적 있다면 동일한 JobExecution을 반환한다.

ItemReader

ItemReaderStep의 대상이 되는 배치 데이터를 읽어오는 인터페이스이다. 파일, XML, CSV, DB 등 여러 타입의 데이터를 읽어올 수 있다.

// org.springframework.batch.item.ItemReader
public interface ItemReader<T> {
	T read() throws Exception, UnexpectedException, ParseException, NonTransientResourceException;
}

ItemReader 에서 read() 메서드의 반환 타입을 제네릭 <T> 으로 구성했기 때문에 직접 타입을 지정할 수 있다.

위에서 설명한 읽기-처리-쓰기에서 읽기를 담당한다고 볼 수 있겠다!

ItemProcessor

ItemProcessorItemReader 로 읽어온 배치 데이터를 변환하는 역할을 수행한다.

읽기-처리-쓰기 에서 처리를 담당한다고 볼 수 있겠다.

굳이 ItemWriter가 아니라 ItemProcessor라는 인터페이스를 분리한 이유는 두 가지다.

  1. 비즈니스 로직을 분리하기 위해서이다. 각각 읽기-처리-쓰기를 담당하게 해 역할을 명확히 분리한다.

  2. Input의 타입과 Output의 타입이 다를 수 있다. Input과 Output의 타입이 ItemProcesor의 제네릭 <I, O>에 들어가게 되니 더 직관적이다.

// org.springframework.batch.item.ItemProcesor
public interface ItemProcessor<I, O> {
	O process(I item) throws Exception;
}

ItemWriter

ItemWriter 는 배치 데이터를 저장한다. 일반적으로 DB 또는 파일에 저장한다.

읽기-처리-쓰기에서 마지막 단계인 쓰기를 담당한다.

// org.springframework.batch.item.ItemWriter
public interface ItemWriter<T> {
	void write(List<? extends T> items) throws Exception;
}

ItemWriterItemReader와 비슷한 방식으로 구현하면 된다.

write() 메서드는 List 자료구조를 이용해 지정한 타입의 리스트를 매개변수를 받는다. 리스트의 데이터 수는 설정한 청크 (chunk) 단위로 불러온다.

write() 메서드는 void 함수라서 반환 값은 따로 없다. 매개변수로 받은 데이터를 저장하는 로직만을 구현하면 된다.

지연 생성 (Lazy Initialization)

  • 메모리 절약 방법.

  • 지연 초기화: 사용자가 실제로 필요할 때만 로딩을 하여 데이터 낭비를 줄이는 방법이다. 즉, 개체를 처음 사용할 때까지 생성을 지연시킨다는 의미이다. Lazy Loading ↔ Eager Loading

  • 참조형 변수의 메모리 할당을 변수를 선언하는 시점에 하지 않고 처음 사용되는 시점에 하는것

  • e.g., 스프링에서 기본 빈 생성은 싱글턴인데, Spring Batch를 사용할 때 @StepScope를 사용하면 해당 메서드는 Step의 주기에 따라 새로운 빈을 생성한다. 즉, Step의 실행마다 새로 빈을 만들기 때문에 지연 생성이 가능하다. — 처음 배우는 스프링 부트 2, p261

청크 지향 프로세싱 (chunk oriented processing)

**청크 지향 프로세싱 (chunk oriented processing)**이란 트랜잭션 경계 내에서 청크 단위로 데이터를 읽고 생성하는 프로그래밍 기법이다.

**청크(chunk)**란 아이템이 트랜잭션에서 커밋되는 수를 뜻한다.

read한 데이터 수가 지정한 청크 단위(CHUNK_SIZE)와 일치하면, write를 수행하고 트랜잭션을 커밋한다.

청크 지향 프로세싱의 이점은?

청크로 나누지 않았을 때는 1000개 중 하나만 실패해도 나머지 999개의 데이터가 롤백된다.

그런데 청크 단위를 10으로 해서 배치 처리를 하면 도중에 배치 처리에 실패하더라도 다른 청크는 영향을 받지 않는다.

따라서 스프링 배치에서는 청크 단위의 프로그래밍을 지향한다.

청크 지향 프로세싱이 아닌 방식 — Tasklet

청크 지향 프로세싱이 아닌 방식은 Tasklet을 이용하는 방식이다.

Tasklet은 임의의 Step을 실행할 때 하나의 작업으로 처리하는 방식이다.

읽기, 처리, 쓰기로 나뉜 방식청크 지향 프로세싱이라면, 이를 단일 개념으로 만든 것이 Tasklet이라고 할 수 있다.

Tasklet 인터페이스는 내부에 execute() 메서드 하나만 지원한다. 내부에 원하는 단일 작업을 구현하고 작업이 끝나면 RepeatStatus.FINISHED를 반환한다. 작업이 계속된다면 RepeatStatus.CONTINUABLE을 반환한다.

Flow 제어

읽기-처리-쓰기 가 부족할 때가 있다. 세부적인 조건이 추가되거나, 특정 조건에 따라 Step의 실행 여부를 결정하고 싶을 때 쓸 수 있는 것이 있는데, 바로 스프링 배치에서 흐름을 제어하기 위해 제공되는 Flow이다.

멀티 스레드로 여러 개의 Step 실행하기

보통 배치 처리당 스레드 하나만 실행할 경우가 대부분이지만, 상황에 따라 여러 Step을 동시에 실행하는 경우도 있다. 스프링 부트 배치는 멀티 스레드로 Step을 실행하는 여러 전략을 제공한다.

  1. TaskExecutor 를 사용해 여러 Step 동작시키기

  2. 여러 개의 Flow 실행시키기

  3. 파티셔닝을 이용한 병렬 프로그래밍

REF

그 외 읽어보면 좋은 글을 모아본다.

Spring Batch와 Querydsl | 우아한형제들 기술블로그

Last updated