programing

Spring Batch에서 Job의 여러 단계간에 데이터를 어떻게 공유 할 수 있습니까?

coolbiz 2021. 1. 14. 22:59
반응형

Spring Batch에서 Job의 여러 단계간에 데이터를 어떻게 공유 할 수 있습니까?


Spring Batch를 파헤 치면서 Job의 여러 단계간에 데이터를 공유 할 수있는 방법을 알고 싶습니다.

이를 위해 JobRepository를 사용할 수 있습니까? 그렇다면 어떻게 할 수 있습니까?

같은 일을 / 달성하는 다른 방법이 있습니까?


작업 저장소는 단계간에 데이터를 전달하는 데 간접적으로 사용됩니다 (Jean-Philippe는 데이터를에 넣은 StepExecutionContext다음 자세한 이름 ExecutionContextPromotionListener사용 하여 단계 실행 컨텍스트 키를 JobExecutionContext.

JobParameter키를로 승격하기위한 리스너가 있다는 것을 알아두면 도움이됩니다 StepExecutionContext(더 자세한 이름은 JobParameterExecutionContextCopyListener). 작업 단계가 서로 완전히 독립적이지 않은 경우이를 많이 사용한다는 것을 알게 될 것입니다.

그렇지 않으면 JMS 대기열이나 (천국 금지) 하드 코딩 된 파일 위치와 같은 더욱 정교한 체계를 사용하여 단계간에 데이터를 전달하게됩니다.

컨텍스트에서 전달되는 데이터의 크기에 관해서는 작게 유지하는 것이 좋습니다 (하지만 데이터에 대한 구체적인 내용은


단계에서 데이터를 StepExecutionContext. 그런 다음 리스너를 사용하여 데이터를에서 StepExecutionContext승격 할 수 있습니다 JobExecutionContext.

이는 JobExecutionContext다음 모든 단계에서 사용할 수 있습니다.

주의 : 데이터는 짧아야합니다. 이러한 컨텍스트는 JobRepository직렬화 의해 저장되며 길이가 제한됩니다 (잘 기억하는 경우 2500 자).

따라서 이러한 컨텍스트는 문자열이나 단순한 값을 공유하는 데 적합하지만 컬렉션이나 막대한 양의 데이터를 공유하는 데는 적합하지 않습니다.

엄청난 양의 데이터를 공유하는 것은 Spring Batch의 철학이 아닙니다. Spring Batch는 거대한 비즈니스 처리 단위가 아닌 별개의 작업 집합입니다.


세 가지 옵션이 있습니다.

  1. 사용 StepContext및 홍보하고 JobContext각 단계에서 액세스 할 수 있습니다. 명시된대로 크기 제한을 준수해야합니다.
  2. @JobScope빈을 생성 하고 해당 빈에 데이터를 추가하고 @Autowire필요한 곳에 사용합니다 (단점은 메모리 내 구조이며 작업이 실패하면 데이터가 손실되면 재시작 가능성 문제가 발생합니다).
  3. 단계 (csv의 각 줄 읽기 및 DB에 쓰기, DB에서 읽기, 집계 및 API로 보내기)에 걸쳐 처리해야하는 더 큰 데이터 세트가 있었기 때문에 스프링 배치 메타 테이블과 동일한 DB에있는 새 테이블의 데이터를 모델링하기로 결정했습니다. idsJobContext액세스 필요하고 작업이 성공적으로 완료 될 때 임시 테이블을 삭제할 때.

Java Bean 객체를 사용할 수 있습니다.

  1. 한 단계 실행
  2. 결과를 Java 객체에 저장
  3. 다음 단계는 1 단계에서 저장된 결과를 얻기 위해 동일한 Java 객체를 참조합니다.

이러한 방식으로 원하는 경우 방대한 양의 데이터를 저장할 수 있습니다.


다음은 단계를 통해 액세스 할 수있는 개체를 저장하기 위해 수행 한 작업입니다.

  1. 작업 컨텍스트에서 개체를 설정하기위한 리스너 생성
@Component("myJobListener")
public class MyJobListener implements JobExecutionListener {

    public void beforeJob(JobExecution jobExecution) {

        String myValue = someService.getValue();
        jobExecution.getExecutionContext().putString("MY_VALUE", myValue);
    }
}
  1. 작업 컨텍스트에서 리스너를 정의했습니다.
<listeners>
         <listener ref="myJobListener"/>
</listeners>
  1. BeforeStep 주석을 사용하여 단계의 값을 사용했습니다.
@BeforeStep
public void initializeValues(StepExecution stepExecution) {

String value = stepExecution.getJobExecution().getExecutionContext().getString("MY_VALUE");

}

단순 개체에 데이터를 저장할 수 있습니다. 처럼:

AnyObject yourObject = new AnyObject();

public Job build(Step step1, Step step2) {
    return jobBuilderFactory.get("jobName")
            .incrementer(new RunIdIncrementer())
            .start(step1)
            .next(step2)
            .build();
}

public Step step1() {
    return stepBuilderFactory.get("step1Name")
            .<Some, Any> chunk(someInteger1)
            .reader(itemReader1())
            .processor(itemProcessor1())
            .writer(itemWriter1(yourObject))
            .build();
}

public Step step2() {
    return stepBuilderFactory.get("step2Name")
            .<Some, Any> chunk(someInteger2)
            .reader(itemReader2())
            .processor(itemProcessor2(yourObject))
            .writer(itemWriter2())
            .build();
}

작성기 또는 다른 방법에서 개체에 데이터를 추가하고 다음 단계의 모든 단계에서 가져옵니다.


`ExecutionContextPromotionListener를 사용하십시오.

public class YourItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(List<? extends Object> items) throws Exception {
// Some Business Logic

// put your data into stepexecution context
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(Final StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}

이제 작업에 promotionListener를 추가해야합니다.

@Bean
public Step step1() {
        return stepBuilder
        .get("step1")<Company,Company>  chunk(10)
        .reader(reader()).processor(processor()).writer(writer())
        .listener(promotionListener()).build();
    }

@Bean
public ExecutionContextPromotionListener promotionListener() {
    ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
    listener.setKeys(new String[] {"someKey"});
    listener.setStrict(true);
    return listener;
}

이제 2 단계에서 작업 ExecutionContext에서 데이터를 가져옵니다.

public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(List<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}

tasklet으로 작업하는 경우 다음을 사용하여 ExecutionContext를 가져 오거나 넣습니다.

List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");

배치 작업을 하나씩 호출하는 작업이 주어졌으며 각 작업은 다른 작업에 의존합니다. 첫 번째 작업 결과는 후속 작업 프로그램을 실행해야합니다. 작업 실행 후 데이터를 전달하는 방법을 찾고있었습니다. 이 ExecutionContextPromotionListener가 유용하다는 것을 알았습니다.

1) 아래와 같이 "ExecutionContextPromotionListener"빈을 추가했습니다.

@Bean
public ExecutionContextPromotionListener promotionListener()
{
    ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
    listener.setKeys( new String[] { "entityRef" } );
    return listener;
}

2) 그런 다음 청취자 중 하나를 내 Steps에 연결했습니다.

Step step = builder.faultTolerant()
            .skipPolicy( policy )
            .listener( writer )
            .listener( promotionListener() )
            .listener( skiplistener )
            .stream( skiplistener )
            .build();

3) Writer 단계 구현에서 stepExecution을 참조로 추가하고 Beforestep에 채웠습니다.

@BeforeStep
public void saveStepExecution( StepExecution stepExecution )
{
    this.stepExecution = stepExecution;
}   

4) in the end of my writer step, i populated the values in the stepexecution as the keys like below

lStepContext.put( "entityRef", lMap );

5) After the job execution, I retrieved the values from the lExecution.getExecutionContext() and populated as job response.

6) from the job response object, I will get the values and populate the required values in the rest of the jobs.

The above code is for promoting the data from the steps to ExecutionContext using ExecutionContextPromotionListener. It can done for in any steps.


Spring Batch creates metadata tables for itself (like batch_job_execution, batch_job_execution_context, batch_step_instance, etc).

And I have tested (using postgres DB) that you can have at least 51,428 chars worth of data in one column (batch_job_execution_context.serialized_content). It could be more, it is just how much I tested.

When you are using Tasklets for your step (like class MyTasklet implements Tasklet) and override the RepeatStatus method in there, you have immediate access to ChunkContext.

class MyTasklet implements Tasklet {

    @Override
    public RepeatStatus execute(@NonNull StepContribution contribution, 
                                @NonNull ChunkContext chunkContext) {
        List<MyObject> myObjects = getObjectsFromSomewhereAndUseThemInNextStep();
        chunkContext.getStepContext().getStepExecution()
        .getJobExecution()
        .getExecutionContext()
        .put("mydatakey", myObjects);
    }
}

And now you have another step with a different Tasklet where you can access those objects

class MyOtherTasklet implements Tasklet {

    @Override
    public RepeatStatus execute(@NonNull StepContribution contribution, 
                                @NonNull ChunkContext chunkContext) {
        List<MyObject> myObjects = (List<MyObject>) 
        chunkContext.getStepContext().getStepExecution()
        .getJobExecution()
        .getExecutionContext()
        .get("mydatakey"); 
    }
}

Or if you dont have a Tasklet and have like a Reader/Writer/Processor, then

class MyReader implements ItemReader<MyObject> {

    @Value("#{jobExecutionContext['mydatakey']}")
    List<MyObject> myObjects;
    // And now myObjects are available in here

    @Override
    public MyObject read() throws Exception {

    }
}

As Nenad Bozic said in his 3rd option, use temp tables to share the data between steps, using context to share also does same thing, it writes to table and loads back in next step, but if you write into temp tables you can clean at the end of job.


Another very simply approach, leaving here for future reference:

class MyTasklet implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
        getExecutionContext.put("foo", "bar");
    }
}

class MyOtherTasklet implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
        getExecutionContext.get("foo");
    }   
}

getExecutionContexthere is:

ExecutionContext getExecutionContext(ChunkContext chunkContext) {
    return chunkContext.getStepContext()
                       .getStepExecution()
                       .getJobExecution()
                       .getExecutionContext();
}     

Put it in a super class, in an interface as a default method, or simply paste in your Tasklets.

ReferenceURL : https://stackoverflow.com/questions/2292667/how-can-we-share-data-between-the-different-steps-of-a-job-in-spring-batch

반응형