package com.mycompany.app;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class App {
/*
* Constant, defining folder with weather data files
*/
private static final String WEATHER_DATA_FOLDER = "weather_data/ghcnd_hcn";
/*
* Constant, defining file with stations data
*/
private static final String WEATHER_STATIONS_FILE = "weather_data/ghcnd-stations.txt";
/*
* Number of top results to show
*/
private static final int TOP_RESULTS_COUNT = 5;
/*
* Number of threads used for sorting
*/
private static final int SORTING_THREADS_COUNT = 4;
/*
* Query, entered by user
*/
private static Query query;
/*
* Comparator for WeatherData, based on user query (MAX/MIN)
*/
private static WeatherData.WeatherDataComparator comparator;
public static void main(String[] args) throws Exception {
//Starting with scanning user input
try (Scanner scanner = new Scanner(System.in)) {
//Obtaining user query
getQuery(scanner);
//Creating comparator based on query MAX parameter
comparator = new WeatherData.WeatherDataComparator(query.max);
// Creating thread pool with fixed number of threads (20 is appropriate number in my environment )
ExecutorService executorService = Executors.newFixedThreadPool(20);
// Creating thread-safe queue
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>();
// List of future result. After creation of all futures, we will try to get their results
List>> filesParsingFutureResult = new ArrayList<>();
// Creating task for each file in folder and sbumit it to the thread pool, saving its future result to list
for (Path file : Files.list(Paths.get(WEATHER_DATA_FOLDER)).collect(Collectors.toList())) {
filesParsingFutureResult.add(executorService.submit(new FileProcessingCallable(file.toString())));
}
// Trying collecting all results of the futures into thread-safe queue
for (Future> future : filesParsingFutureResult) {
queue.addAll(future.get());
}
// This list will contain top results, summarized for all sorting threads
List topResults = new ArrayList<>();
// Futures list for sorting tasks
List>> sortingFutureResult = new ArrayList<>();
// Creating sorting tasks and submit it to thread-pool, saving its future to appropriate collection
for (int i = 0; i < SORTING_THREADS_COUNT; i++) {
sortingFutureResult.add(executorService.submit(new SortingCallable(queue)));
}
// Collecting results into top results list
for (Future> future : sortingFutureResult) {
topResults.addAll(future.get());
}
// Further processing is in the single thread, as allowed in assignment
// Sorting results based on created comparator
topResults.sort(comparator);
// Getting only necessary number of results
topResults = topResults.subList(0, TOP_RESULTS_COUNT);
// Filling station info for each of top results
fillStations(topResults);
// Outputting results
for (int i = 0; i < TOP_RESULTS_COUNT; i++) {
System.out.println(topResults.get(i));
}
// Closing thread-pool
executorService.shutdown();
}
}
/*
* Private method for filling station info of weather data instances
*/
private static void fillStations(List data) throws Exception {
// Creating map, for getting StationData instance by its ID
Map map = new HashMap<>();
//Reading station data file
try(Scanner stationScanner = new Scanner(new File(WEATHER_STATIONS_FILE))){
while (stationScanner.hasNextLine()) {
// Parsing StationData instance from file and putting it to the map
StationData stationData = StationData.parseStationData(stationScanner.nextLine());
map.put(stationData.getId(), stationData);
}
}
// Setting station data for each of given weather data objects.
for (WeatherData wd : data) {
wd.setStationData(map.get(wd.getId()));
}
}
/*
* Callable implementation for WeatherData file reading procedure
*/
static class FileProcessingCallable implements Callable> {
private final String fileName;
/*
* Constructor only takes name of file to process
*/
public FileProcessingCallable(String fileName) {
this.fileName = fileName;
}
/*
* Th body of this callable, opens file, reads it, and returns the list of weather data from this file,
* satisfying user query
*/
@Override
public List call() throws Exception {
List result = new LinkedList<>();
try (Scanner scanner = new Scanner(new File(fileName))) {
while (scanner.hasNextLine()) {
result.addAll(WeatherData.parseWeatherData(scanner.nextLine(), query));
}
}
return result;
}
}
/*
* Callable implementation for WeatherData sorting procedure
*/
static class SortingCallable implements Callable> {
private final Queue queue;
/*
* Constructor only takes queue to read objects from
*/
public SortingCallable(Queue queue) {
this.queue = queue;
}
/*
* Callable body does the following:
* it reads from queue weather data objects one by one and stores only top results,
* based on user query MAX parameter (used via comparator)
*/
@Override
public List call() {
// The final top results list
// Element at 0 position is a candidate for being removed, since it has the worst value among top results
List top = new ArrayList<>();
WeatherData data;
while ((data = queue.poll()) != null) {
if (top.isEmpty()) {
top.add(data);
}
if (comparator.compare(top.get(0), data) > 0) {
top.add(data);
if (top.size() > TOP_RESULTS_COUNT) {
top.remove(0);
}
} else {
if (top.size() < TOP_RESULTS_COUNT) {
top.add(0, data);
}
}
}
return top;
}
}
/* USER CONSOLE MENU static methods */
private static void getQuery(Scanner scanner) {
System.out.println("Enter your query:");
int yearFrom = getIntParameter(scanner, "Please, enter year from ", 1000, 2020);
int monthFrom = getIntParameter(scanner, "Please, enter month from ", 1, 12);
int dayFrom = getIntParameter(scanner, "Please, enter day from ", 1, 31);
int yearTo = getIntParameter(scanner, "Please, enter year from ", 1000, 2020);
int monthTo = getIntParameter(scanner, "Please, enter month from ", 1, 12);
int dayTo = getIntParameter(scanner, "Please, enter day from ", 1, 31);
System.out.println("Do you want find max temperature? ('y'-for max, otherwise - min)");
boolean max = scanner.nextLine().trim().equalsIgnoreCase("y");
query = new Query(yearFrom, monthFrom, dayFrom, yearTo, monthTo, dayTo, max);
}
private static int getIntParameter(Scanner scanner, String message, int lower, int upper) {
while (true) {
try {
System.out.println(message + "[" + lower + "," + upper + "]:");
int value = Integer.parseInt(scanner.nextLine());
if (value < lower || value > upper)
throw new NumberFormatException();
return value;
} catch (NumberFormatException e) {
System.out.println("Invalid input. Please, try again.");
}
}
}
/*
* Plain java class for storing query parameters,
* contains method to check is the weather data matches this query
*/
static class Query {
int yearFrom;
int monthFrom;
int dayFrom;
int yearTo;
int monthTo;
int dayTo;
boolean max;
Query(int yearFrom, int monthFrom, int dayFrom, int yearTo, int monthTo, int dayTo, boolean max) {
this.yearFrom = yearFrom;
this.monthFrom = monthFrom;
this.dayFrom = dayFrom;
this.yearTo = yearTo;
this.monthTo = monthTo;
this.dayTo = dayTo;
this.max = max;
}
/*
* Method for checking if given WeatherData matches this query
* returns True, if matches, false - otherwise
*/
public boolean accepts(WeatherData weatherData) {
int year = weatherData.getYear();
int month = weatherData.getMonth();
int day = weatherData.getDay();
if (year < yearFrom)
return false;
if (year == yearFrom) {
if (month < monthFrom)
return false;
if (month == monthFrom) {
if (day < dayFrom)
return false;
}
}
if (year > yearTo)
return false;
if (year == yearTo) {
if (month > monthTo)
return false;
if (month == monthTo) {
return day <= dayTo;
}
}
return true;
}
}
}