Commit b465e8ce authored by Bergamote Orange's avatar Bergamote Orange
Browse files

batch v2 partitioning in indexation + packages cleaning

correct config for partitioned step
parent ab754bda
package fr.gouv.clea.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CleaBatchJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public Job cleaBatchJob(Step clusterIdentification, Step clustersIndexation, Step prefixesComputing, Step emptyIntermediateDb) {
return this.jobBuilderFactory.get("clea-batch-job")
.incrementer(new RunIdIncrementer())
.start(emptyIntermediateDb)
.next(clusterIdentification)
.next(prefixesComputing)
.next(clustersIndexation)
.build();
}
}
package fr.gouv.clea.config;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import static fr.gouv.clea.config.BatchConstants.SINGLE_PLACE_CLUSTER_PERIOD_TABLE;
@Configuration
public class EmptyIntermediateDbStepBatchConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public Step emptyIntermediateDb() {
return stepBuilderFactory.get("emptyIntermediateDb")
.tasklet(emptyDb())
.build();
}
@Bean
public Tasklet emptyDb() {
return (contribution, chunkContext) -> {
JdbcOperations jdbcTemplate = new JdbcTemplate(dataSource);
jdbcTemplate.execute("truncate " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE + ";");
return RepeatStatus.FINISHED;
};
}
}
......@@ -9,33 +9,14 @@ import fr.gouv.clea.identification.processor.ClusterToPeriodsProcessor;
import fr.gouv.clea.identification.processor.SinglePlaceExposedVisitsBuilder;
import fr.gouv.clea.identification.processor.SinglePlaceExposedVisitsProcessor;
import fr.gouv.clea.identification.writer.SinglePlaceClusterPeriodListWriter;
import fr.gouv.clea.indexation.IndexationPartitioner;
import fr.gouv.clea.indexation.model.output.ClusterFile;
import fr.gouv.clea.indexation.processors.SinglePlaceClusterBuilder;
import fr.gouv.clea.indexation.readers.StepExecutionContextReader;
import fr.gouv.clea.indexation.writers.IndexationWriter;
import fr.gouv.clea.init.EmptyIntermediateDBTasklet;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import fr.gouv.clea.prefixes.ListItemReader;
import fr.gouv.clea.prefixes.PrefixesMemoryWriter;
import fr.gouv.clea.prefixes.PrefixesStorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import fr.gouv.clea.mapper.ClusterPeriodModelsMapper;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
......@@ -43,26 +24,16 @@ import org.springframework.core.task.TaskExecutor;
import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
import static fr.gouv.clea.config.BatchConstants.*;
import static fr.gouv.clea.config.BatchConstants.EXPOSED_VISITS_TABLE;
import static fr.gouv.clea.config.BatchConstants.LTID_COL;
@Slf4j
@Configuration
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
public class IdentificationStepBatchConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private RiskConfigurationService riskConfigurationService;
@Autowired
private PrefixesStorageService prefixesStorageService;
@Autowired
private BatchProperties properties;
......@@ -70,48 +41,10 @@ public class BatchConfig {
private DataSource dataSource;
@Autowired
private SinglePlaceClusterPeriodMapper mapper;
@Bean
public Job identificationJob() {
return this.jobBuilderFactory.get("identificationJob")
.incrementer(new RunIdIncrementer())
.start(emptyIntermediateDBstep())
.next(clusterIdentification())
.next(prefixesComputing())
.next(masterStepIndexation())
.build();
}
@Bean
public JdbcCursorItemReader<String> ltidDBReader() {
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("select distinct " + LTID_COL + " from " + EXPOSED_VISITS_TABLE + " order by " + LTID_COL);
reader.setRowMapper((rs, i) -> rs.getString(1));
return reader;
}
/**
* Agregates elements from delegate to a list and returns it all in one
*
* @return list of ltids as strings
*/
@Bean
public ItemReader<List<String>> ltidListDBReader() {
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setSaveState(false);
reader.setDataSource(dataSource);
reader.setVerifyCursorPosition(false);
reader.setSql("select distinct " + LTID_COL + " from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE + " ORDER BY " + LTID_COL);
reader.setRowMapper((rs, i) -> rs.getString(1));
return new ListItemReader(reader);
}
private ClusterPeriodModelsMapper mapper;
// =================
// Cluster identification & indexation steps
// =================
@Autowired
private RiskConfigurationService riskConfigurationService;
@Bean
public Step clusterIdentification() {
......@@ -123,8 +56,8 @@ public class BatchConfig {
);
final SynchronizedItemStreamReader<String> reader = new SynchronizedItemStreamReader<>();
reader.setDelegate(ltidDBReader());
return stepBuilderFactory.get("identification")
reader.setDelegate(identificationStepReader());
return stepBuilderFactory.get("clusterIdentification")
.<String, List<SinglePlaceClusterPeriod>>chunk(1000)
.reader(reader)
.processor(compositeProcessor)
......@@ -134,73 +67,35 @@ public class BatchConfig {
.build();
}
@Bean
public Step emptyIntermediateDBstep() {
return stepBuilderFactory.get("emptyIntermediateDB")
.tasklet(emptyIntermediateDB())
.build();
}
@Bean
public Step prefixesComputing() {
return stepBuilderFactory.get("prefixes")
.<List<String>, List<String>>chunk(1000)
.reader(ltidListDBReader())
.writer(new PrefixesMemoryWriter(properties, prefixesStorageService))
.build();
}
@Bean
public Step clusterIndexation() {
// MemoryMapItemReader reader = new MemoryMapItemReader((prefixesStorageService.getPrefixWithAssociatedLtidsMap().entrySet())::iterator);
return stepBuilderFactory.get("indexation")
//FIXME: set config for chunk size
.<Map.Entry<String, List<String>>, ClusterFile>chunk(1)
.reader(memoryMapItemReader(null, null))
.processor(singlePlaceClusterBuilder()) // build a Map of ClusterFile at once
.writer(indexationWriter()) // build Files and index
.build();
}
// =================
// Identification step reader
// =================
@Bean
public IndexationWriter indexationWriter() {
return new IndexationWriter(properties, prefixesStorageService);
}
public JdbcCursorItemReader<String> identificationStepReader() {
@Bean
public ItemProcessor<Map.Entry<String, List<String>>, ClusterFile> singlePlaceClusterBuilder() {
return new SinglePlaceClusterBuilder(dataSource, mapper, properties);
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("select distinct " + LTID_COL + " from " + EXPOSED_VISITS_TABLE + " order by " + LTID_COL);
reader.setRowMapper((rs, i) -> rs.getString(1));
return reader;
}
@Bean
@StepScope
public ItemReader<Map.Entry<String, List<String>>> memoryMapItemReader(@Value("#{stepExecutionContext['prefix']}") String prefix,
@Value("#{stepExecutionContext['ltids']}") List<String> ltids) {
return new StepExecutionContextReader(prefix, ltids);
public ItemProcessor<String, SinglePlaceExposedVisits> exposedVisitBuilder() {
return new SinglePlaceExposedVisitsBuilder(dataSource, new ExposedVisitRowMapper());
}
@Bean
public TaskExecutorPartitionHandler partitionHandler() {
final TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
//FIXME: set config for gridSize
partitionHandler.setGridSize(5);
partitionHandler.setStep(clusterIndexation());
partitionHandler.setTaskExecutor(new SimpleAsyncTaskExecutor());
return partitionHandler;
}
@Bean
public Step masterStepIndexation() {
return this.stepBuilderFactory.get("indexationMaster")
.partitioner("partitioner", prefixPartitioner())
.partitionHandler(partitionHandler())
.build();
public ItemProcessor<SinglePlaceExposedVisits, SinglePlaceCluster> singleClusterPlaceBuilder() {
return new SinglePlaceExposedVisitsProcessor(properties, riskConfigurationService);
}
@Bean
public Partitioner prefixPartitioner() {
log.info("callToPartitioner");
return new IndexationPartitioner(prefixesStorageService);
public ItemProcessor<SinglePlaceCluster, List<SinglePlaceClusterPeriod>> singlePlaceClusterPeriodListBuilder() {
return new ClusterToPeriodsProcessor(mapper);
}
@Bean
......@@ -208,23 +103,4 @@ public class BatchConfig {
return new SimpleAsyncTaskExecutor("batch-ident");
}
@Bean
public Tasklet emptyIntermediateDB() {
return new EmptyIntermediateDBTasklet(dataSource);
}
@Bean
public ItemProcessor<String, SinglePlaceExposedVisits> exposedVisitBuilder() {
return new SinglePlaceExposedVisitsBuilder(dataSource, new ExposedVisitRowMapper());
}
@Bean
public ItemProcessor<SinglePlaceExposedVisits, SinglePlaceCluster> singleClusterPlaceBuilder() {
return new SinglePlaceExposedVisitsProcessor(properties, riskConfigurationService);
}
@Bean
public ItemProcessor<SinglePlaceCluster, List<SinglePlaceClusterPeriod>> singlePlaceClusterPeriodListBuilder() {
return new ClusterToPeriodsProcessor(mapper);
}
}
package fr.gouv.clea.config;
import fr.gouv.clea.indexation.IndexationPartitioner;
import fr.gouv.clea.indexation.model.output.ClusterFile;
import fr.gouv.clea.indexation.processor.SinglePlaceClusterBuilder;
import fr.gouv.clea.indexation.reader.StepExecutionContextReader;
import fr.gouv.clea.indexation.writer.IndexationWriter;
import fr.gouv.clea.mapper.ClusterPeriodModelsMapper;
import fr.gouv.clea.service.PrefixesStorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
@Slf4j
@Configuration
public class IndexationStepBatchConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private PrefixesStorageService prefixesStorageService;
@Autowired
private BatchProperties properties;
@Autowired
private DataSource dataSource;
@Autowired
private ClusterPeriodModelsMapper mapper;
@Bean
public Step clustersIndexation() {
return this.stepBuilderFactory.get("indexationMaster")
.partitioner("partitioner", prefixPartitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public Partitioner prefixPartitioner() {
log.info("callToPartitioner");
return new IndexationPartitioner(prefixesStorageService);
}
@Bean
public TaskExecutorPartitionHandler partitionHandler() {
final TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
partitionHandler.setGridSize(properties.getGridSize());
partitionHandler.setStep(partitionedClustersIndexation());
partitionHandler.setTaskExecutor(indexationTaskExecutor());
return partitionHandler;
}
@Bean
public Step partitionedClustersIndexation() {
return stepBuilderFactory.get("clustersIndexation")
//FIXME: set config for chunk size
.<Map.Entry<String, List<String>>, ClusterFile>chunk(1000)
.reader(memoryMapItemReader(null, null))
.processor(singlePlaceClusterBuilder()) // build a Map of ClusterFile at once
.writer(indexationWriter()) // build Files and index
.build();
}
@Bean
@StepScope
public ItemReader<Map.Entry<String, List<String>>> memoryMapItemReader(
@Value("#{stepExecutionContext['prefixes']}") List<String> prefixes,
@Value("#{stepExecutionContext['ltids']}") List<List<String>> ltids) {
return new StepExecutionContextReader(prefixes, ltids);
}
@Bean
public ItemProcessor<Map.Entry<String, List<String>>, ClusterFile> singlePlaceClusterBuilder() {
return new SinglePlaceClusterBuilder(dataSource, mapper, properties);
}
@Bean
public IndexationWriter indexationWriter() {
return new IndexationWriter(properties, prefixesStorageService);
}
@Bean
public SimpleAsyncTaskExecutor indexationTaskExecutor() {
return new SimpleAsyncTaskExecutor("batch-index");
}
}
package fr.gouv.clea.config;
import fr.gouv.clea.prefixes.ListItemReader;
import fr.gouv.clea.prefixes.PrefixesMemoryWriter;
import fr.gouv.clea.service.PrefixesStorageService;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.List;
import static fr.gouv.clea.config.BatchConstants.LTID_COL;
import static fr.gouv.clea.config.BatchConstants.SINGLE_PLACE_CLUSTER_PERIOD_TABLE;
@Configuration
public class PrefixesStepBatchConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Autowired
private PrefixesStorageService prefixesStorageService;
@Autowired
private BatchProperties properties;
@Bean
public Step prefixesComputing() {
return stepBuilderFactory.get("prefixesComputing")
.<List<String>, List<String>>chunk(1000)
.reader(ltidListDBReader())
.writer(new PrefixesMemoryWriter(properties, prefixesStorageService))
.build();
}
/**
* Agregates elements from delegate to a list and returns it all in one
*
* @return list of ltids as strings
*/
@Bean
public ItemReader<List<String>> ltidListDBReader() {
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setSaveState(false);
reader.setDataSource(dataSource);
reader.setVerifyCursorPosition(false);
reader.setSql("select distinct " + LTID_COL + " from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE + " ORDER BY " + LTID_COL);
reader.setRowMapper((rs, i) -> rs.getString(1));
return new ListItemReader(reader);
}
}
package fr.gouv.clea.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import fr.gouv.clea.entity.ExposedVisit;
import lombok.AllArgsConstructor;
import lombok.Builder;
......
package fr.gouv.clea.dto;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import fr.gouv.clea.entity.ExposedVisit;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Data
@NoArgsConstructor
@AllArgsConstructor
......
......@@ -2,9 +2,8 @@ package fr.gouv.clea.identification.processor;
import fr.gouv.clea.dto.SinglePlaceCluster;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import fr.gouv.clea.mapper.ClusterPeriodModelsMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import java.util.List;
......@@ -13,7 +12,7 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
public class ClusterToPeriodsProcessor implements ItemProcessor<SinglePlaceCluster, List<SinglePlaceClusterPeriod>> {
private final SinglePlaceClusterPeriodMapper mapper;
private final ClusterPeriodModelsMapper mapper;
@Override
public List<SinglePlaceClusterPeriod> process(SinglePlaceCluster cluster) {
......
package fr.gouv.clea.identification.writer;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.jdbc.core.namedparam.*;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils;
import javax.sql.DataSource;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static fr.gouv.clea.config.BatchConstants.*;
......
package fr.gouv.clea.indexation;
import fr.gouv.clea.prefixes.PrefixesStorageService;
import fr.gouv.clea.service.PrefixesStorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static fr.gouv.clea.config.BatchConstants.LTIDS_PARAM;
import java.util.*;
@Slf4j
public class IndexationPartitioner implements Partitioner {
......@@ -32,16 +28,41 @@ public class IndexationPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
log.info("Computing indexation partitions...");
final Map<String, List<String>> map = prefixesStorageService.getPrefixWithAssociatedLtidsMap();
final Iterator<Map.Entry<String, List<String>>> mapIterator = map.entrySet().iterator();
final Map<String, ExecutionContext> result = new HashMap<>();