Commit ae41635f authored by Combava Orange's avatar Combava Orange Committed by Bergamote Orange
Browse files

[clea-batch] NoPersistenceBatchConfigurer serialize no context

new step to generate cluster index only one time
parent 3cef94bb
......@@ -17,14 +17,14 @@ public class CleaBatchJobConfig {
private JobBuilderFactory jobBuilderFactory;
@Bean
public Job cleaBatchJob(Step clusterIdentification, Step clustersIndexation, Step prefixesComputing, Step emptyIntermediateDb) {
public Job cleaBatchJob(Step clusterIdentification, Step clustersIndexation, Step prefixesComputing, Step emptyIntermediateDb, Step clusterIndexGeneration) {
return this.jobBuilderFactory.get("clea-batch-job")
.incrementer(new RunIdIncrementer())
.start(emptyIntermediateDb)
.next(clusterIdentification)
.next(prefixesComputing)
.next(clustersIndexation)
.next(clusterIndexGeneration)
.build();
}
}
package fr.gouv.clea.config;
import fr.gouv.clea.indexation.index.GenerateClusterIndexTasklet;
import fr.gouv.clea.service.PrefixesStorageService;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ClusterIndexGenerationStepBatchConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private PrefixesStorageService prefixesStorageService;
@Autowired
private BatchProperties batchProperties;
@Bean
public Step clusterIndexGeneration() {
return stepBuilderFactory.get("clusterIndexGeneration")
.tasklet(generateClusterIndex())
.build();
}
@Bean
public Tasklet generateClusterIndex() {
return new GenerateClusterIndexTasklet(batchProperties, prefixesStorageService);
}
}
......@@ -94,7 +94,7 @@ public class IndexationStepBatchConfig {
@Bean
public IndexationWriter indexationWriter() {
return new IndexationWriter(properties, prefixesStorageService);
return new IndexationWriter(properties);
}
@Bean
......
package fr.gouv.clea.config;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.repository.ExecutionContextSerializer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import javax.sql.DataSource;
@Component
public class NoPersistenceBatchConfigurer extends DefaultBatchConfigurer {
@Override
public void setDataSource(DataSource dataSource) {
// prevents spring batch from storing JobRepository in database and use in-memory map instead
}
private DataSource dataSource;
@Autowired(required = false)
@Override
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
super.setDataSource(dataSource);
}
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(getTransactionManager());
// this serializer ignore input so executionContext store Null values
factory.setSerializer(new ExecutionContextSerializer() {
@Override
public Map<String, Object> deserialize(InputStream inputStream) throws IOException {
return null;
}
@Override
public void serialize(Map<String, Object> object, OutputStream outputStream) throws IOException {
// Noop
}
});
factory.afterPropertiesSet();
return factory.getObject();
}
}
package fr.gouv.clea.indexation.index;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import fr.gouv.clea.config.BatchProperties;
import fr.gouv.clea.indexation.model.output.ClusterFileIndex;
import fr.gouv.clea.service.PrefixesStorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Set;
@Slf4j
public class GenerateClusterIndexTasklet implements Tasklet {
private final PrefixesStorageService prefixesStorageService;
private final String outputPath;
public GenerateClusterIndexTasklet(final BatchProperties batchProperties, PrefixesStorageService prefixesStorageService) {
this.outputPath = batchProperties.getFilesOutputPath();
this.prefixesStorageService = prefixesStorageService;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
generateClusterIndex(chunkContext.getStepContext().getJobInstanceId(), prefixesStorageService.getPrefixWithAssociatedLtidsMap().keySet());
return RepeatStatus.FINISHED;
}
private void generateClusterIndex(final Long jobId, final Set<String> prefixes) throws IOException {
ClusterFileIndex clusterFileIndex = ClusterFileIndex.builder()
.iteration(jobId.intValue())
.prefixes(prefixes)
.build();
log.info("Generating cluster index : " + outputPath + File.separator + "clusterIndex.json");
Path jsonPath = Paths.get(outputPath + File.separator + "clusterIndex.json");
File jsonIndex = jsonPath.toFile();
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
mapper.writeValue(jsonIndex, clusterFileIndex);
}
}
......@@ -29,21 +29,13 @@ public class IndexationWriter implements ItemWriter<ClusterFile> {
private Long jobId;
private final PrefixesStorageService prefixesStorageService;
@BeforeStep
public void retrieveInterStepData(final StepExecution stepExecution) {
this.jobId = stepExecution.getJobExecutionId();
}
@AfterStep
public void createClusterIndex(final StepExecution stepExecution) throws IOException {
generateClusterIndex(prefixesStorageService.getPrefixWithAssociatedLtidsMap().keySet());
}
public IndexationWriter(final BatchProperties config, final PrefixesStorageService prefixesStorageService) {
public IndexationWriter(final BatchProperties config) {
this.outputPath = config.getFilesOutputPath();
this.prefixesStorageService = prefixesStorageService;
}
@Override
......@@ -72,19 +64,4 @@ public class IndexationWriter implements ItemWriter<ClusterFile> {
}
}
private void generateClusterIndex(final Set<String> prefixes) throws IOException {
ClusterFileIndex clusterFileIndex = ClusterFileIndex.builder()
.iteration(jobId.intValue())
.prefixes(prefixes)
.build();
log.info("Generating cluster index : " + outputPath + File.separator + "clusterIndex.json");
Path jsonPath = Paths.get(outputPath + File.separator + "clusterIndex.json");
File jsonIndex = jsonPath.toFile();
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
mapper.writeValue(jsonIndex, clusterFileIndex);
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment