Commit 713638ca authored by Bergamote Orange's avatar Bergamote Orange
Browse files

dedicated clusterfile generation step

cleanup
parent ae41635f
......@@ -10,8 +10,6 @@ public class BatchConstants {
public static final String PERIOD_COLUMN = "period_start";
public static final String TIMESLOT_COLUMN = "timeslot";
public static final String LTIDS_PARAM = "ltids";
public static final String LTID_COL = "ltid";
public static final String VENUE_TYPE_COL = "venue_type";
public static final String VENUE_CAT1_COL = "venue_category1";
......@@ -22,5 +20,4 @@ public class BatchConstants {
public static final String CLUSTER_START_COL = "cluster_start";
public static final String CLUSTER_DURATION_COL = "cluster_duration_in_seconds";
public static final String RISK_LEVEL_COL = "risk_level";
}
......@@ -67,11 +67,6 @@ public class IdentificationStepBatchConfig {
.build();
}
// =================
// Identification step reader
// =================
@Bean
public JdbcCursorItemReader<String> identificationStepReader() {
......@@ -87,7 +82,6 @@ public class IdentificationStepBatchConfig {
return new SinglePlaceExposedVisitsBuilder(dataSource, new ExposedVisitRowMapper());
}
@Bean
public ItemProcessor<SinglePlaceExposedVisits, SinglePlaceCluster> singleClusterPlaceBuilder() {
return new SinglePlaceExposedVisitsProcessor(properties, riskConfigurationService);
......
......@@ -82,6 +82,7 @@ public class IndexationStepBatchConfig {
@Bean
@StepScope
public ItemReader<Map.Entry<String, List<String>>> memoryMapItemReader(
//values provided through step execution context by prefixPartitioner
@Value("#{stepExecutionContext['prefixes']}") List<String> prefixes,
@Value("#{stepExecutionContext['ltids']}") List<List<String>> ltids) {
return new StepExecutionContextReader(prefixes, ltids);
......
......@@ -26,7 +26,6 @@ public class NoPersistenceBatchConfigurer extends DefaultBatchConfigurer {
super.setDataSource(dataSource);
}
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
......@@ -37,12 +36,12 @@ public class NoPersistenceBatchConfigurer extends DefaultBatchConfigurer {
factory.setSerializer(new ExecutionContextSerializer() {
@Override
public Map<String, Object> deserialize(InputStream inputStream) throws IOException {
public Map<String, Object> deserialize(InputStream inputStream) {
return null;
}
@Override
public void serialize(Map<String, Object> object, OutputStream outputStream) throws IOException {
public void serialize(Map<String, Object> object, OutputStream outputStream) {
// Noop
}
});
......
......@@ -56,5 +56,4 @@ public class PrefixesStepBatchConfig {
reader.setRowMapper((rs, i) -> rs.getString(1));
return new ListItemReader(reader);
}
}
......@@ -13,6 +13,7 @@ import org.springframework.batch.repeat.RepeatStatus;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Set;
......@@ -30,12 +31,16 @@ public class GenerateClusterIndexTasklet implements Tasklet {
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
generateClusterIndex(chunkContext.getStepContext().getJobInstanceId(), prefixesStorageService.getPrefixWithAssociatedLtidsMap().keySet());
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws IOException {
final Long jobId = chunkContext.getStepContext().getJobInstanceId();
log.info("Creating directories if not exists: " + outputPath + File.separator + jobId + File.separator);
Files.createDirectories(Paths.get(outputPath + File.separator + jobId + File.separator));
generateClusterIndex(jobId, prefixesStorageService.getPrefixWithAssociatedLtidsMap().keySet());
return RepeatStatus.FINISHED;
}
private void generateClusterIndex(final Long jobId, final Set<String> prefixes) throws IOException {
ClusterFileIndex clusterFileIndex = ClusterFileIndex.builder()
......
......@@ -41,7 +41,7 @@ public class IndexationWriter implements ItemWriter<ClusterFile> {
@Override
public void write(List<? extends ClusterFile> clusterFile) throws Exception {
log.info("Creating directories : " + outputPath + File.separator + this.jobId + File.separator);
log.info("Creating directories if not exists: " + outputPath + File.separator + jobId + File.separator);
Files.createDirectories(Paths.get(outputPath + File.separator + this.jobId + File.separator));
......
......@@ -6,7 +6,7 @@ spring:
password: password
batch:
# Manage it's batch metadata tables
# Manage it's batch metadata tables
initialize-schema: always
cluster:
output-path: /tmp/v1
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment