/*
 * Decompiled with CFR 0.152.
 */
package proai.cache;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import net.sf.bvalid.Validator;
import org.apache.log4j.Logger;
import proai.MetadataFormat;
import proai.Record;
import proai.SetInfo;
import proai.cache.CachedMetadataFormat;
import proai.cache.Committer;
import proai.cache.QueueItem;
import proai.cache.QueueIterator;
import proai.cache.RCDatabase;
import proai.cache.RCDisk;
import proai.cache.RecordCache;
import proai.cache.Worker;
import proai.driver.OAIDriver;
import proai.driver.RemoteIterator;
import proai.error.ImmediateShutdownException;
import proai.error.RepositoryException;
import proai.error.ServerException;
import proai.util.SetSpec;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class Updater
extends Thread {
    private static Logger _LOG = Logger.getLogger((String)Updater.class.getName());
    private int _pollSeconds;
    private int _maxWorkers;
    private int _maxWorkBatchSize;
    private int _maxFailedRetries;
    private int _maxCommitQueueSize;
    private int _maxRecordsPerTransaction;
    private OAIDriver _driver;
    private RCDatabase _db;
    private RCDisk _disk;
    private Validator _validator;
    private boolean _shutdownRequested;
    private boolean _immediateShutdownRequested;
    private QueueIterator _queueIterator;
    private Worker[] _workers;
    private Committer _committer;
    private boolean _processingAborted;
    private String _status;

    public Updater(OAIDriver driver, RecordCache cache, RCDatabase db, RCDisk disk, int pollSeconds, int maxWorkers, int maxWorkBatchSize, int maxFailedRetries, int maxCommitQueueSize, int maxRecordsPerTransaction, Validator validator) {
        this._driver = driver;
        this._db = db;
        this._disk = disk;
        this._pollSeconds = pollSeconds;
        this._maxWorkers = maxWorkers;
        this._maxWorkBatchSize = maxWorkBatchSize;
        this._maxFailedRetries = maxFailedRetries;
        this._maxCommitQueueSize = maxCommitQueueSize;
        this._maxRecordsPerTransaction = maxRecordsPerTransaction;
        this._validator = validator;
    }

    @Override
    public void run() {
        this._status = "Started";
        while (!this._shutdownRequested) {
            long cycleStartTime = System.currentTimeMillis();
            _LOG.info((Object)"Update cycle initiated");
            try {
                this._status = "Processing any old items in queue";
                this.checkImmediateShutdown();
                this.processQueue("old");
                this.checkImmediateShutdown();
                this._status = "Polling and updating queue and database";
                this.pollAndUpdate();
                this._status = "Processing any new items in queue";
                this.checkImmediateShutdown();
                this.processQueue("new");
                this.checkImmediateShutdown();
                this._status = "Pruning old files from cache if needed";
                this.pruneIfNeeded();
                long sec = (System.currentTimeMillis() - cycleStartTime) / 1000L;
                _LOG.info((Object)("Update cycle finished in " + sec + "sec." + "Next cycle scheduled in " + this._pollSeconds + "sec."));
            }
            catch (ImmediateShutdownException e) {
                _LOG.info((Object)"Update cycle aborted due to immediate shutdown request");
            }
            catch (Throwable th) {
                _LOG.error((Object)"Update cycle failed", th);
            }
            this._status = "Sleeping";
            for (int waitedSeconds = 0; !this._shutdownRequested && waitedSeconds < this._pollSeconds; ++waitedSeconds) {
                try {
                    Thread.sleep(1000L);
                    continue;
                }
                catch (Exception e) {
                    // empty catch block
                }
            }
        }
        this._status = "Finished";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pruneIfNeeded() throws Exception {
        Connection conn = null;
        File resultFile = null;
        PrintWriter resultWriter = null;
        BufferedReader resultReader = null;
        try {
            conn = RecordCache.getConnection();
            if (this._db.getPrunableCount(conn) > 0) {
                resultFile = File.createTempFile("proai-prunable", ".txt");
                resultWriter = new PrintWriter(new OutputStreamWriter((OutputStream)new FileOutputStream(resultFile), "UTF-8"));
                int numToPrune = this._db.dumpPrunables(conn, resultWriter);
                resultWriter.close();
                _LOG.info((Object)("Pruning " + numToPrune + " old files from cache"));
                resultReader = new BufferedReader(new InputStreamReader((InputStream)new FileInputStream(resultFile), "UTF-8"));
                int i = 0;
                int[] toPruneKeys = new int[32];
                String line = resultReader.readLine();
                while (line != null) {
                    String[] parts = line.split(" ");
                    if (parts.length == 2) {
                        int pruneKey = Integer.parseInt(parts[0]);
                        File file = this._disk.getFile(parts[1]);
                        if (file.exists()) {
                            boolean deleted = file.delete();
                            if (deleted) {
                                _LOG.debug((Object)("Deleted old cache file: " + parts[1]));
                            } else {
                                _LOG.warn((Object)("Unable to delete old cache file (will try again later): " + parts[1]));
                            }
                        } else {
                            _LOG.debug((Object)("No need to delete non-existing old cache file: " + parts[1]));
                        }
                        toPruneKeys[i++] = pruneKey;
                        if (i == toPruneKeys.length) {
                            this._db.deletePrunables(conn, toPruneKeys, i);
                            i = 0;
                        }
                    }
                    line = resultReader.readLine();
                }
                if (i > 0) {
                    this._db.deletePrunables(conn, toPruneKeys, i);
                }
            } else {
                _LOG.info((Object)"Pruning is not needed.");
            }
        }
        finally {
            if (resultWriter != null) {
                try {
                    resultWriter.close();
                }
                catch (Exception e) {}
                if (resultReader != null) {
                    try {
                        resultReader.close();
                    }
                    catch (Exception e) {}
                }
            }
            if (resultFile != null) {
                resultFile.delete();
            }
            RecordCache.releaseConnection(conn);
        }
    }

    private void checkImmediateShutdown() throws ImmediateShutdownException {
        if (this._immediateShutdownRequested) {
            throw new ImmediateShutdownException();
        }
    }

    public void shutdown(boolean immediate) {
        if (this.isAlive()) {
            this._shutdownRequested = true;
            this._immediateShutdownRequested = immediate;
            while (this.isAlive()) {
                _LOG.info((Object)("Waiting for updater to finish.  Current status: " + this._status));
                try {
                    Thread.sleep(250L);
                }
                catch (Exception exception) {}
            }
            _LOG.info((Object)"Updater shutdown complete");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pollAndUpdate() throws ServerException {
        Connection conn = null;
        boolean startedTransaction = false;
        try {
            conn = RecordCache.getConnection();
            conn.setAutoCommit(false);
            startedTransaction = true;
            this._db.queueFailedRecords(conn, this._maxFailedRetries);
            if (this._db.isPollingEnabled(conn)) {
                long latestRemoteDate = this._driver.getLatestDate().getTime();
                if (latestRemoteDate > this._db.getEarliestPollDate(conn)) {
                    _LOG.info((Object)"Starting update process; source data of interest may have changed.");
                    this.checkImmediateShutdown();
                    this.updateIdentify(conn);
                    this.checkImmediateShutdown();
                    List<String> allPrefixes = this.updateFormats(conn);
                    this.checkImmediateShutdown();
                    this.updateSets(conn);
                    this.checkImmediateShutdown();
                    this.queueUpdatedRecords(conn, allPrefixes, latestRemoteDate);
                } else {
                    _LOG.info((Object)"Skipping update process; source data of interest has not changed");
                }
            } else {
                _LOG.info((Object)"Remote polling skipped -- polling is disabled");
            }
            conn.commit();
        }
        catch (Throwable th) {
            if (startedTransaction) {
                try {
                    conn.rollback();
                }
                catch (SQLException e) {
                    _LOG.error((Object)"Failed to roll back failed transaction", (Throwable)e);
                }
            }
            throw new ServerException("Update cycle phase one aborted", th);
        }
        finally {
            if (conn != null) {
                try {
                    if (startedTransaction) {
                        conn.setAutoCommit(false);
                    }
                }
                catch (SQLException e) {
                    _LOG.error((Object)"Failed to set autoCommit to false", (Throwable)e);
                }
                finally {
                    RecordCache.releaseConnection(conn);
                }
            }
        }
    }

    private void updateIdentify(Connection conn) throws Exception {
        _LOG.info((Object)"Getting 'Identify' xml from remote source...");
        this._db.setIdentifyPath(conn, this._disk.write(this._driver));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<String> updateFormats(Connection conn) throws Exception {
        _LOG.info((Object)"Updating metadata formats...");
        RemoteIterator<? extends MetadataFormat> riter = this._driver.listMetadataFormats();
        ArrayList<String> newPrefixes = new ArrayList<String>();
        try {
            while (riter.hasNext()) {
                this.checkImmediateShutdown();
                MetadataFormat format = riter.next();
                this._db.putFormat(conn, format);
                newPrefixes.add(format.getPrefix());
            }
        }
        finally {
            try {
                riter.close();
            }
            catch (Exception e) {
                _LOG.warn((Object)"Unable to close remote metadata format iterator", (Throwable)e);
            }
        }
        for (CachedMetadataFormat format : this._db.getFormats(conn)) {
            String oldPrefix = format.getPrefix();
            if (newPrefixes.contains(oldPrefix)) continue;
            this.checkImmediateShutdown();
            this._db.deleteFormat(conn, oldPrefix);
        }
        return newPrefixes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateSets(Connection conn) throws Exception {
        _LOG.info((Object)"Updating sets...");
        RemoteIterator<? extends SetInfo> riter = this._driver.listSetInfo();
        HashSet<String> newSpecs = new HashSet<String>();
        HashSet<String> missingSpecs = new HashSet<String>();
        try {
            while (riter.hasNext()) {
                this.checkImmediateShutdown();
                SetInfo setInfo = riter.next();
                String encounteredSetSpec = setInfo.getSetSpec();
                if (SetSpec.hasParents(encounteredSetSpec) && !newSpecs.contains(SetSpec.parentOf(encounteredSetSpec))) {
                    missingSpecs.add(SetSpec.parentOf(encounteredSetSpec));
                }
                this._db.putSetInfo(conn, encounteredSetSpec, this._disk.write(setInfo));
                newSpecs.add(encounteredSetSpec);
            }
        }
        finally {
            try {
                riter.close();
            }
            catch (Exception e) {
                _LOG.warn((Object)"Unable to close remote set info iterator", (Throwable)e);
            }
        }
        for (String possiblyMissing : missingSpecs) {
            if (!SetSpec.isValid(possiblyMissing)) {
                throw new RepositoryException("SetSpec '" + possiblyMissing + "' is malformed");
            }
            for (String spec : SetSpec.allSetsFor(possiblyMissing)) {
                if (newSpecs.contains(spec)) continue;
                this._db.putSetInfo(conn, spec, this._disk.write(SetSpec.defaultInfoFor(spec)));
                newSpecs.add(spec);
                _LOG.warn((Object)("Adding missing set: " + spec));
            }
        }
        Iterator<SetInfo> iter = this._db.getSetInfo(conn).iterator();
        while (iter.hasNext()) {
            String oldSpec = iter.next().getSetSpec();
            if (newSpecs.contains(oldSpec)) continue;
            this.checkImmediateShutdown();
            this._db.deleteSet(conn, oldSpec);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void queueUpdatedRecords(Connection conn, List<String> allPrefixes, long latestRemoteDate) throws Exception {
        _LOG.info((Object)"Querying and queueing updated records...");
        long queueStartTime = System.currentTimeMillis();
        int totalQueuedCount = 0;
        for (String mdPrefix : allPrefixes) {
            long lastPollDate = this._db.getLastPollDate(conn, mdPrefix);
            if (lastPollDate < latestRemoteDate) {
                _LOG.info((Object)("Querying for changed " + mdPrefix + " records because " + lastPollDate + " is less than " + latestRemoteDate));
                this.checkImmediateShutdown();
                RemoteIterator<? extends Record> riter = this._driver.listRecords(new Date(lastPollDate), new Date(latestRemoteDate), mdPrefix);
                try {
                    int queuedCount = 0;
                    while (riter.hasNext()) {
                        Record record = riter.next();
                        this.checkImmediateShutdown();
                        this._db.queueRemoteRecord(conn, record.getItemID(), record.getPrefix(), record.getSourceInfo());
                        ++queuedCount;
                    }
                    _LOG.info((Object)("Queued " + queuedCount + " new/modified " + mdPrefix + " records."));
                    this._db.setLastPollDate(conn, mdPrefix, latestRemoteDate);
                    totalQueuedCount += queuedCount;
                    continue;
                }
                finally {
                    try {
                        riter.close();
                    }
                    catch (Exception e) {
                        _LOG.warn((Object)"Unable to close remote record iterator", (Throwable)e);
                    }
                    continue;
                }
            }
            _LOG.info((Object)("Skipping " + mdPrefix + " records because " + lastPollDate + " is not less than " + latestRemoteDate));
        }
        long sec = (System.currentTimeMillis() - queueStartTime) / 1000L;
        _LOG.info((Object)("Queued " + totalQueuedCount + " total new/modified records in " + sec + "sec."));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int countItemsInQueue() throws Exception {
        Connection conn = RecordCache.getConnection();
        try {
            int n = this._db.getQueueSize(conn);
            return n;
        }
        finally {
            RecordCache.releaseConnection(conn);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private QueueIterator newQueueIterator() throws Exception {
        Connection conn = null;
        File queueFile = null;
        PrintWriter queueWriter = null;
        try {
            conn = RecordCache.getConnection();
            queueFile = File.createTempFile("proai-queue", ".txt");
            queueWriter = new PrintWriter(new OutputStreamWriter((OutputStream)new FileOutputStream(queueFile), "UTF-8"));
            this._db.dumpQueue(conn, queueWriter);
            queueWriter.close();
            QueueIterator queueIterator = new QueueIterator(queueFile);
            return queueIterator;
        }
        finally {
            if (queueWriter != null) {
                try {
                    queueWriter.close();
                }
                catch (Exception e) {}
            }
            if (queueFile != null) {
                queueFile.delete();
            }
            RecordCache.releaseConnection(conn);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processQueue(String kind) throws Exception {
        _LOG.info((Object)("Processing " + kind + " records in queue..."));
        int itemsInQueue = this.countItemsInQueue();
        this.checkImmediateShutdown();
        if (itemsInQueue > 0) {
            long processingStartTime = System.currentTimeMillis();
            this._processingAborted = false;
            while (itemsInQueue > 0 && !this._processingAborted) {
                try {
                    this._queueIterator = this.newQueueIterator();
                    this._committer = new Committer(this, this._db, this._maxCommitQueueSize, this._maxRecordsPerTransaction);
                    int numWorkers = itemsInQueue / this._maxWorkBatchSize;
                    if (numWorkers > this._maxWorkers) {
                        numWorkers = this._maxWorkers;
                    }
                    if (numWorkers == 0) {
                        numWorkers = 1;
                    }
                    _LOG.info((Object)("Queue has " + itemsInQueue + " records.  Starting " + numWorkers + " worker threads for processing."));
                    this._workers = new Worker[numWorkers];
                    for (int i = 0; i < this._workers.length; ++i) {
                        this._workers[i] = new Worker(i + 1, this._workers.length, this, this._driver, this._disk, this._validator);
                        this._workers[i].start();
                    }
                    this._committer.start();
                    while (this._committer.isAlive()) {
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (Exception e) {}
                    }
                    this.checkImmediateShutdown();
                }
                finally {
                    if (this._queueIterator != null) {
                        this._queueIterator.close();
                    }
                    if (this._workers != null) {
                        this.logProcessingStats(itemsInQueue, System.currentTimeMillis() - processingStartTime);
                        this._workers = null;
                        this._committer = null;
                    }
                }
                itemsInQueue = this.countItemsInQueue();
            }
            if (this._processingAborted) {
                throw new ServerException("Queue processing was aborted due to unexpected error (see above)");
            }
        } else {
            _LOG.info((Object)"Queue is empty.  No processing needed.");
        }
    }

    protected void handleCommitException(Throwable th) {
        _LOG.warn((Object)"Processing aborted due to commit failure", th);
        this._processingAborted = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected List<QueueItem> getNextBatch(List<QueueItem> finishedItems) {
        ArrayList<QueueItem> nextBatch = null;
        if (!this.processingShouldStop()) {
            if (finishedItems != null) {
                this._committer.handoff(finishedItems);
            }
            try {
                QueueIterator queueIterator = this._queueIterator;
                synchronized (queueIterator) {
                    if (this._queueIterator.hasNext()) {
                        nextBatch = new ArrayList<QueueItem>();
                        while (this._queueIterator.hasNext() && nextBatch.size() < this._maxWorkBatchSize) {
                            nextBatch.add(this._queueIterator.next());
                        }
                    }
                }
            }
            catch (Throwable th) {
                _LOG.warn((Object)"Processing aborted due to commit failure", th);
                Updater updater = this;
                synchronized (updater) {
                    this._processingAborted = true;
                }
                nextBatch = null;
            }
        }
        return nextBatch;
    }

    protected synchronized boolean processingShouldStop() {
        return this._processingAborted || this._immediateShutdownRequested;
    }

    protected boolean anyWorkersAreRunning() {
        if (this._workers == null) {
            return false;
        }
        for (int i = 0; i < this._workers.length; ++i) {
            if (!this._workers[i].isAlive()) continue;
            return true;
        }
        return false;
    }

    private void logProcessingStats(int initialQueueSize, long totalDuration) {
        StringBuffer stats = new StringBuffer();
        int recordsProcessed = this._committer.getProcessedCount();
        stats.append("    Records processed        : " + recordsProcessed + " of " + initialQueueSize + " on queue\n");
        stats.append("    Total processing time    : " + Updater.getHMSString(totalDuration) + "\n");
        double processingRate = (double)recordsProcessed / ((double)totalDuration / 1000.0);
        stats.append("    Processing rate          : " + Updater.round(processingRate) + " records/second\n");
        stats.append("    Workers spawned          : " + this._workers.length + " of " + this._maxWorkers + " maximum\n");
        int failedCount = 0;
        int attemptedCount = 0;
        long totalFetchTime = 0L;
        for (int i = 0; i < this._workers.length; ++i) {
            failedCount += this._workers[i].getFailedCount();
            attemptedCount += this._workers[i].getAttemptedCount();
            totalFetchTime += this._workers[i].getTotalFetchTime();
        }
        stats.append("    Failed record loads      : " + failedCount + " of " + attemptedCount + " attempted\n");
        long msPerAttempt = totalFetchTime / (long)attemptedCount;
        stats.append("    Avg roundtrip fetch time : " + Updater.getHMSString(msPerAttempt) + "\n");
        int transactionCount = this._committer.getTransactionCount();
        stats.append("    Total DB transactions    : " + transactionCount + "\n");
        stats.append("    Total transaction time   : " + Updater.getHMSString(this._committer.getTotalCommitTime()) + "\n");
        long msPerTrans = Math.round((double)this._committer.getTotalCommitTime() / (double)transactionCount);
        stats.append("    Avg time/transaction     : " + Updater.getHMSString(msPerTrans) + "\n");
        double recsPerTrans = (double)recordsProcessed / (double)transactionCount;
        stats.append("    Avg recs/transaction     : " + Updater.round(recsPerTrans) + " of " + this._maxRecordsPerTransaction + " maximum\n");
        _LOG.info((Object)("A round of queue processing has finished.\n\nProcessing Stats:\n" + stats.toString()));
    }

    private static double round(double val) {
        return (double)Math.round(val * 100.0) / 100.0;
    }

    private static String getHMSString(long ms) {
        StringBuffer out = new StringBuffer();
        long hours = ms / 3600000L;
        long minutes = (ms -= hours * 1000L * 60L * 60L) / 60000L;
        long seconds = (ms -= minutes * 1000L * 60L) / 1000L;
        ms -= seconds * 1000L;
        if (hours > 0L) {
            out.append(hours + " hours, ");
        }
        if (minutes > 0L) {
            out.append(minutes + " minutes, ");
        }
        String msString = ms > 99L ? "." + ms : (ms > 9L ? ".0" + ms : (ms > 0L ? ".00" + ms : ".000"));
        out.append(seconds + msString + " seconds");
        return out.toString();
    }
}

