Commit 6f4cad89 authored by Figue Orange's avatar Figue Orange Committed by Bergamote Orange
Browse files

refactor cluster file and index generation

alternative map reader version

fix batch first step writer to handle all input lists
parent 08e9d265
......@@ -4,9 +4,10 @@ import fr.gouv.clea.dto.SinglePlaceCluster;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.dto.SinglePlaceExposedVisits;
import fr.gouv.clea.identification.*;
import fr.gouv.clea.indexation.model.output.ClusterFileItem;
import fr.gouv.clea.indexation.model.output.ClusterFile;
import fr.gouv.clea.indexation.processors.SinglePlaceClusterBuilder;
import fr.gouv.clea.indexation.readers.PrefixesMemoryReader;
import fr.gouv.clea.indexation.readers.IteratorItemReader;
import fr.gouv.clea.indexation.readers.MemoryMapItemReader;
import fr.gouv.clea.indexation.writers.IndexationWriter;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import fr.gouv.clea.prefixes.ListItemReader;
......@@ -29,7 +30,10 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import javax.sql.DataSource;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import static fr.gouv.clea.config.BatchConstants.*;
......@@ -86,6 +90,7 @@ public class BatchConfig {
@Bean
public ItemReader<List<String>> ltidListDBReader() {
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setSaveState(false);
reader.setDataSource(dataSource);
reader.setVerifyCursorPosition(false);
reader.setSql("select distinct " + LTID_COL + " from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE + " ORDER BY " + LTID_COL);
......@@ -121,7 +126,6 @@ public class BatchConfig {
return stepBuilderFactory.get("prefixes")
.<List<String>, List<String>>chunk(1000)
.reader(ltidListDBReader())
// .processor(new PrefixesComputingProcessor(properties, prefixesStorageService))
.writer(new PrefixesMemoryWriter(properties, prefixesStorageService))
.taskExecutor(taskExecutor())
.throttleLimit(10)
......@@ -130,11 +134,12 @@ public class BatchConfig {
@Bean
public Step clusterIndexation() {
MemoryMapItemReader reader = new MemoryMapItemReader((prefixesStorageService.getPrefixWithAssociatedLtidsMap().entrySet())::iterator);
return stepBuilderFactory.get("indexation")
.<String, ClusterFileItem>chunk(1)
.reader(new PrefixesMemoryReader())
.processor(new SinglePlaceClusterBuilder(dataSource, mapper)) // build a Map of ClusterFile at once
.writer(new IndexationWriter(properties)) // build Files and index
.<Map.Entry<String, List<String>>, ClusterFile>chunk(1)
.reader(reader)
.processor(new SinglePlaceClusterBuilder(dataSource, mapper, properties)) // build a Map of ClusterFile at once
.writer(new IndexationWriter(properties, prefixesStorageService)) // build Files and index
.build();
}
......
......@@ -7,12 +7,10 @@ public class BatchConstants {
public static final String EXPOSED_VISITS_TABLE = "exposed_visits";
public static final String SINGLE_PLACE_CLUSTER_PERIOD_TABLE = "cluster_periods";
public static final String SINGLE_PLACE_CLUSTER_TABLE = "cluster_periods";
public static final String PERIOD_COLUMN = "period_start";
public static final String TIMESLOT_COLUMN = "timeslot";
public static final String LTID_PARAM = "ltid";
public static final String CLUSTERMAP_JOB_CONTEXT_KEY = "clusterMap";
public static final String LTID_COL = "ltid";
public static final String VENUE_TYPE_COL = "venue_type";
......
......@@ -26,14 +26,16 @@ public class SinglePlaceClusterPeriodListWriter extends JdbcBatchItemWriter<List
}
@Override
public void write(List<? extends List<SinglePlaceClusterPeriod>> list) {
list.get(0).forEach(singlePlaceClusterPeriod -> {
final BeanPropertySqlParameterSource parameterSource = new BeanPropertySqlParameterSource(singlePlaceClusterPeriod);
jdbcTemplate.update(getInsertSql(), parameterSource);
});
public void write(List<? extends List<SinglePlaceClusterPeriod>> lists) {
lists.forEach(list ->
list.forEach(singlePlaceClusterPeriod -> {
final BeanPropertySqlParameterSource parameterSource = new BeanPropertySqlParameterSource(singlePlaceClusterPeriod);
jdbcTemplate.update(getInsertSql(), parameterSource);
}));
}
private String getInsertSql() {
// values as parameters from SinglePlaceClusterPeriod attributes
return "insert into " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE +
// column names
" (" +
......@@ -47,10 +49,7 @@ public class SinglePlaceClusterPeriodListWriter extends JdbcBatchItemWriter<List
CLUSTER_START_COL,
CLUSTER_DURATION_COL,
RISK_LEVEL_COL) +
")" +
" values " +
// values as parameters from SinglePlaceClusterPeriod attributes
"(" +
") values (" +
String.join(", ",
":locationTemporaryPublicId",
":venueType", ":venueCategory1",
......
package fr.gouv.clea.indexation.model.output;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import java.util.ArrayList;
import java.util.List;
import lombok.*;
@Data
@ToString
@EqualsAndHashCode
public class ClusterFile {
private String name;
private List<ClusterFileItem> items;
public ClusterFile() {
......
package fr.gouv.clea.indexation.processors;
import fr.gouv.clea.config.BatchProperties;
import fr.gouv.clea.dto.ClusterPeriod;
import fr.gouv.clea.dto.SinglePlaceCluster;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.indexation.SinglePlaceClusterPeriodRowMapper;
import fr.gouv.clea.indexation.model.output.ClusterFile;
import fr.gouv.clea.indexation.model.output.ClusterFileItem;
import fr.gouv.clea.indexation.model.output.Prefix;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
......@@ -12,6 +15,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
......@@ -20,38 +24,55 @@ import static fr.gouv.clea.config.BatchConstants.LTID_COL;
import static fr.gouv.clea.config.BatchConstants.SINGLE_PLACE_CLUSTER_PERIOD_TABLE;
@Slf4j
public class SinglePlaceClusterBuilder implements ItemProcessor<String, ClusterFileItem> {
public class SinglePlaceClusterBuilder implements ItemProcessor<Map.Entry<String, List<String>>, ClusterFile> {
private final JdbcTemplate jdbcTemplate;
private final SinglePlaceClusterPeriodMapper mapper;
private final BatchProperties properties;
AtomicLong counter = new AtomicLong();
public SinglePlaceClusterBuilder(DataSource dataSource, SinglePlaceClusterPeriodMapper mapper) {
public SinglePlaceClusterBuilder(
final DataSource dataSource,
final SinglePlaceClusterPeriodMapper mapper,
final BatchProperties properties) {
jdbcTemplate = new JdbcTemplate(dataSource);
this.mapper = mapper;
this.properties = properties;
}
@Override
public ClusterFileItem process(final String ltid) {
final List<SinglePlaceClusterPeriod> clusterPeriodList = jdbcTemplate.query("select * from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE
+ " WHERE ltid= ? ORDER BY " + LTID_COL,
new SinglePlaceClusterPeriodRowMapper(), UUID.fromString(ltid));
SinglePlaceClusterPeriod singlePlaceClusterPeriod = clusterPeriodList.stream().findFirst().orElse(null);
if (null != singlePlaceClusterPeriod) {
long ln = counter.incrementAndGet();
if (0 == ln % 1000) {
log.info("Loaded {} singlePlaceClusterPeriod, current LTId={} ", ln, ltid);
public ClusterFile process(final Map.Entry<String, List<String>> ltids) {
ClusterFile clusterFile = new ClusterFile();
clusterFile.setName(Prefix.of(ltids.getValue().get(0), properties.prefixLength));
ltids.getValue().forEach(ltid -> {
final List<SinglePlaceClusterPeriod> clusterPeriodList = jdbcTemplate.query("select * from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE
+ " WHERE ltid= ? ORDER BY " + LTID_COL,
new SinglePlaceClusterPeriodRowMapper(), UUID.fromString(ltid));
SinglePlaceClusterPeriod singlePlaceClusterPeriod = clusterPeriodList.stream().findFirst().orElse(null);
if (null != singlePlaceClusterPeriod) {
long ln = counter.incrementAndGet();
if (0 == ln % 1000) {
log.info("Loaded {} singlePlaceClusterPeriod, current LTId={} ", ln, ltid);
}
List<ClusterPeriod> clusterPeriods = clusterPeriodList.stream().map(mapper::map).collect(Collectors.toList());
clusterFile.addItem(ClusterFileItem.ofCluster(SinglePlaceCluster.builder()
.locationTemporaryPublicId(singlePlaceClusterPeriod.getLocationTemporaryPublicId())
.venueCategory1(singlePlaceClusterPeriod.getVenueCategory1())
.venueCategory2(singlePlaceClusterPeriod.getVenueCategory2())
.venueType(singlePlaceClusterPeriod.getVenueType())
.periods(clusterPeriods)
.build()));
}
List<ClusterPeriod> clusterPeriods = clusterPeriodList.stream().map(mapper::map).collect(Collectors.toList());
return ClusterFileItem.ofCluster(SinglePlaceCluster.builder()
.locationTemporaryPublicId(singlePlaceClusterPeriod.getLocationTemporaryPublicId())
.venueCategory1(singlePlaceClusterPeriod.getVenueCategory1())
.venueCategory2(singlePlaceClusterPeriod.getVenueCategory2())
.venueType(singlePlaceClusterPeriod.getVenueType())
.periods(clusterPeriods)
.build());
}
return null;
});
log.info("Created cluterFile object containing {} items with prefix : {} ",
ltids.getValue().size(),
clusterFile.getName());
return clusterFile;
}
}
package fr.gouv.clea.indexation.readers;
import org.springframework.batch.core.annotation.BeforeRead;
import org.springframework.batch.item.ItemReader;
import java.util.Iterator;
import java.util.concurrent.Callable;
public class IteratorItemReader<T> implements ItemReader<T> {
private Iterator<T> iterator = null;
private final Callable<Iterator<T>> iteratorProvider;
public IteratorItemReader(Callable<Iterator<T>> iteratorProvider) {
this.iteratorProvider = iteratorProvider;
}
@Override
public T read() throws Exception {
if (iterator == null) {
iterator = iteratorProvider.call();
}
if (iterator.hasNext()) {
return iterator.next();
}
return null;
}
}
package fr.gouv.clea.indexation.readers;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
public class MemoryMapItemReader extends IteratorItemReader<Map.Entry<String, List<String>>> {
public MemoryMapItemReader(Callable<Iterator<Map.Entry<String, List<String>>>> iterator) {
super(iterator);
}
}
package fr.gouv.clea.indexation.readers;
import org.springframework.batch.item.ItemReader;
public class PrefixesMemoryReader implements ItemReader<String> {
@Override
public String read() {
//read items from PrefixesStorageService instance
return null;
}
}
......@@ -5,10 +5,10 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import fr.gouv.clea.config.BatchProperties;
import fr.gouv.clea.indexation.model.output.ClusterFile;
import fr.gouv.clea.indexation.model.output.ClusterFileIndex;
import fr.gouv.clea.indexation.model.output.ClusterFileItem;
import fr.gouv.clea.indexation.model.output.Prefix;
import fr.gouv.clea.prefixes.PrefixesStorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
......@@ -18,53 +18,48 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.Set;
@StepScope
@Slf4j
public class IndexationWriter implements ItemWriter<ClusterFileItem> {
public class IndexationWriter implements ItemWriter<ClusterFile> {
private String outputPath;
private Long jobId;
private final int prefixLength;
private final PrefixesStorageService prefixesStorageService;
@BeforeStep
public void retrieveInterStepData(final StepExecution stepExecution) {
this.jobId = stepExecution.getJobExecutionId();
}
public IndexationWriter(final BatchProperties config) {
@AfterStep
public void createClusterIndex(final StepExecution stepExecution) throws IOException {
generateClusterIndex(prefixesStorageService.getPrefixWithAssociatedLtidsMap().keySet());
}
public IndexationWriter(final BatchProperties config, final PrefixesStorageService prefixesStorageService) {
this.outputPath = config.clusterFilesOutputPath;
this.prefixLength = config.prefixLength;
this.prefixesStorageService = prefixesStorageService;
}
@Override
public void write(List<? extends ClusterFileItem> items) throws Exception {
public void write(List<? extends ClusterFile> items) throws Exception {
log.info("Creating directories : " + outputPath + File.separator + this.jobId + File.separator);
Files.createDirectories(Paths.get(outputPath + File.separator + this.jobId + File.separator));
//generate index json file
final HashMap<String, ClusterFile> clusterIndexMap = new HashMap<>();
items.forEach(clusterFileItem -> {
final String uuid = clusterFileItem.getTemporaryLocationId();
ClusterFile clusterFile = clusterIndexMap.computeIfAbsent(Prefix.of(uuid, prefixLength), key -> new ClusterFile());
clusterFile.addItem(clusterFileItem);
});
//generate cluster files
clusterIndexMap.forEach(this::generateClusterFile);
items.forEach(this::generateClusterFile);
generateClusterIndex(clusterIndexMap);
}
private void generateClusterFile(final String prefix, final ClusterFile clusterFile) {
private void generateClusterFile(final ClusterFile clusterFile) {
final String outputClusterFilePath = outputPath + File.separator + this.jobId + File.separator + prefix + ".json";
final String outputClusterFilePath = outputPath + File.separator + this.jobId + File.separator + clusterFile.getName() + ".json";
log.debug("Generating cluster file : {}", outputClusterFilePath);
Path jsonClusterPath = Paths.get(outputClusterFilePath);
File jsonClusterFile = jsonClusterPath.toFile();
......@@ -78,11 +73,11 @@ public class IndexationWriter implements ItemWriter<ClusterFileItem> {
}
}
private void generateClusterIndex(final HashMap<String, ClusterFile> clusterIndexMap) throws IOException {
private void generateClusterIndex(final Set<String> prefixes) throws IOException {
ClusterFileIndex clusterFileIndex = ClusterFileIndex.builder()
.iteration(jobId.intValue())
.prefixes(clusterIndexMap.keySet())
.prefixes(prefixes)
.build();
log.info("Generating cluster index : " + outputPath + File.separator + "clusterIndex.json");
......
......@@ -3,13 +3,11 @@ package fr.gouv.clea.prefixes;
import fr.gouv.clea.config.BatchProperties;
import fr.gouv.clea.indexation.model.output.Prefix;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemWriter;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@RequiredArgsConstructor
public class PrefixesMemoryWriter implements ItemWriter<List<String>> {
......@@ -26,8 +24,8 @@ public class PrefixesMemoryWriter implements ItemWriter<List<String>> {
public void write(List<? extends List<String>> ltids) {
ltids.get(0).forEach(ltid -> {
final String prefix = Prefix.of(ltid, prefixLength);
prefixesStorageService.getClustersMap().computeIfAbsent(prefix, p -> new ArrayList<>());
prefixesStorageService.getClustersMap().get(prefix).add(ltid);
prefixesStorageService.getPrefixWithAssociatedLtidsMap().computeIfAbsent(prefix, p -> new ArrayList<>());
prefixesStorageService.getPrefixWithAssociatedLtidsMap().get(prefix).add(ltid);
});
}
}
......@@ -10,7 +10,6 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class PrefixesStorageService {
//FIXME: what kind of concurrent object to use?
@Getter
private final Map<String, List<String>> clustersMap = new ConcurrentHashMap<>();
private final Map<String, List<String>> prefixWithAssociatedLtidsMap = new ConcurrentHashMap<>();
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment