package com.liquidwarelabs.stratusphere.data.inspection.processor.copy;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.Monitor;
import com.liquidwarelabs.common.db.TransactionFactory;
import com.liquidwarelabs.common.db.copy.CopyInConverter;
import com.liquidwarelabs.common.db.copy.SqlBulkCopier;
import com.liquidwarelabs.common.eventbus.AbstractEventBusUser;
import com.liquidwarelabs.common.executorservice.ScheduledExecutorServiceFactory;
import com.liquidwarelabs.common.json.MapJsonProvider;
import com.liquidwarelabs.common.properties.PropertiesChangeEvent;
import com.liquidwarelabs.common.properties.PropertiesUtil;
import com.liquidwarelabs.stratusphere.dao.node.NodeDao;
import com.liquidwarelabs.stratusphere.data.inspection.processor.copy.converters.AbstractSummaryReportCopyInConverter;
import com.liquidwarelabs.stratusphere.data.inspection.reader.InspectionReportReadEvent;
import com.trustednetworktech.common.util.DateUtil;
import io.nats.streaming.ConnectionLostHandler;
import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import io.nats.streaming.NatsStreaming;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;

/* loaded from: input_file:com/liquidwarelabs/stratusphere/data/inspection/processor/copy/SqlBulkCopierInspectionDataProcessor.class */
public final class SqlBulkCopierInspectionDataProcessor extends AbstractEventBusUser {
    private List<CopyInConverter> copyInConverters;
    private PropertiesUtil propertiesUtil;
    private SqlBulkCopier sqlBulkCopier;
    private String propertiesFile;
    private volatile int maxBatchSize;
    private volatile int maxBatchTimeSec;
    private NodeDao nodeDao;
    private TransactionFactory transactionFactory;
    private String natsClusterName;
    private String processedReportsTopicName;
    private StreamingConnection nats;
    private Subscription processReportsSub;
    private MapJsonProvider mapJsonProvider;
    private Long natsReconnectTimeoutSec;
    private Cache<Date, InspectionDataBatch> inspectionDataBatches;
    private ScheduledExecutorServiceFactory scheduledExecutorServiceFactory;
    private final AtomicBoolean isInit = new AtomicBoolean();
    private final Monitor natsInitMonitor = new Monitor();
    private final AtomicInteger activeCount = new AtomicInteger();
    private final Monitor eventMonitor = new Monitor();
    private Monitor.Guard eventGuard = new Monitor.Guard(this.eventMonitor) { // from class: com.liquidwarelabs.stratusphere.data.inspection.processor.copy.SqlBulkCopierInspectionDataProcessor.1
        public boolean isSatisfied() {
            return SqlBulkCopierInspectionDataProcessor.this.activeCount.get() < Runtime.getRuntime().availableProcessors();
        }
    };

    public Long getNatsReconnectTimeoutSec() {
        return this.natsReconnectTimeoutSec;
    }

    public void setNatsReconnectTimeoutSec(Long l) {
        this.natsReconnectTimeoutSec = l;
    }

    public MapJsonProvider getMapJsonProvider() {
        return this.mapJsonProvider;
    }

    public void setMapJsonProvider(MapJsonProvider mapJsonProvider) {
        this.mapJsonProvider = mapJsonProvider;
    }

    public String getNatsClusterName() {
        return this.natsClusterName;
    }

    public void setNatsClusterName(String str) {
        this.natsClusterName = str;
    }

    public String getProcessedReportsTopicName() {
        return this.processedReportsTopicName;
    }

    public void setProcessedReportsTopicName(String str) {
        this.processedReportsTopicName = str;
    }

    public TransactionFactory getTransactionFactory() {
        return this.transactionFactory;
    }

    public void setTransactionFactory(TransactionFactory transactionFactory) {
        this.transactionFactory = transactionFactory;
    }

    public List<CopyInConverter> getCopyInConverters() {
        return this.copyInConverters;
    }

    public void setCopyInConverters(List<CopyInConverter> list) {
        this.copyInConverters = list;
    }

    public NodeDao getNodeDao() {
        return this.nodeDao;
    }

    public void setNodeDao(NodeDao nodeDao) {
        this.nodeDao = nodeDao;
    }

    public ScheduledExecutorServiceFactory getScheduledExecutorServiceFactory() {
        return this.scheduledExecutorServiceFactory;
    }

    public void setScheduledExecutorServiceFactory(ScheduledExecutorServiceFactory scheduledExecutorServiceFactory) {
        this.scheduledExecutorServiceFactory = scheduledExecutorServiceFactory;
    }

    public PropertiesUtil getPropertiesUtil() {
        return this.propertiesUtil;
    }

    public void setPropertiesUtil(PropertiesUtil propertiesUtil) {
        this.propertiesUtil = propertiesUtil;
    }

    public SqlBulkCopier getSqlBulkCopier() {
        return this.sqlBulkCopier;
    }

    public void setSqlBulkCopier(SqlBulkCopier sqlBulkCopier) {
        this.sqlBulkCopier = sqlBulkCopier;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void destroyNats() {
        if (getLog().isDebugEnabled()) {
            getLog().debug("Destroying NATS connection");
        }
        this.natsInitMonitor.enter();
        try {
            try {
                if (this.processReportsSub != null) {
                    this.processReportsSub.unsubscribe();
                    this.processReportsSub = null;
                }
            } catch (Exception e) {
                getLog().error("Failed to close subscription", e);
            }
            try {
                if (this.nats != null) {
                    this.nats.close();
                    this.nats = null;
                }
            } catch (Exception e2) {
                getLog().error("Failed to close NATS connection", e2);
            }
        } finally {
            this.natsInitMonitor.leave();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleNatsReconnect() {
        getLog().warn("Failed to reconnect to NATS, will retry in {}s", getNatsReconnectTimeoutSec());
        getScheduledExecutorServiceFactory().getScheduledExecutorService().schedule(new Runnable() { // from class: com.liquidwarelabs.stratusphere.data.inspection.processor.copy.SqlBulkCopierInspectionDataProcessor.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    SqlBulkCopierInspectionDataProcessor.this.initNats();
                } catch (IOException | InterruptedException | TimeoutException e) {
                    SqlBulkCopierInspectionDataProcessor.this.getLog().error("Nats INIT failed on reconnect", e);
                    SqlBulkCopierInspectionDataProcessor.this.scheduleNatsReconnect();
                }
            }
        }, getNatsReconnectTimeoutSec().longValue(), TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initNats() throws InterruptedException, TimeoutException, IOException {
        this.natsInitMonitor.enter();
        try {
            if (this.nats != null) {
                destroyNats();
            }
            if (getLog().isDebugEnabled()) {
                getLog().debug("Initializing NATS connection");
            }
            this.nats = NatsStreaming.connect(getNatsClusterName(), getClass().getSimpleName(), new Options.Builder().connectionLostHandler(new ConnectionLostHandler() { // from class: com.liquidwarelabs.stratusphere.data.inspection.processor.copy.SqlBulkCopierInspectionDataProcessor.3
                public void connectionLost(StreamingConnection streamingConnection, Exception exc) {
                    SqlBulkCopierInspectionDataProcessor.this.getLog().error("NATS connection lost, attempting reconnection", exc);
                    SqlBulkCopierInspectionDataProcessor.this.destroyNats();
                    try {
                        SqlBulkCopierInspectionDataProcessor.this.initNats();
                    } catch (IOException | InterruptedException | TimeoutException e) {
                        SqlBulkCopierInspectionDataProcessor.this.scheduleNatsReconnect();
                    }
                }
            }).build());
            if (getLog().isDebugEnabled()) {
                getLog().debug("Initializing prcessed reports subscription");
            }
            this.processReportsSub = this.nats.subscribe(getProcessedReportsTopicName(), new MessageHandler() { // from class: com.liquidwarelabs.stratusphere.data.inspection.processor.copy.SqlBulkCopierInspectionDataProcessor.4
                public void onMessage(Message message) {
                    try {
                        if (SqlBulkCopierInspectionDataProcessor.this.getMapJsonProvider() == null) {
                            throw new IllegalStateException("No Map JSON provider available");
                        }
                        if (SqlBulkCopierInspectionDataProcessor.this.getLog().isTraceEnabled()) {
                            SqlBulkCopierInspectionDataProcessor.this.getLog().trace("Received topic message");
                        }
                        Map map = SqlBulkCopierInspectionDataProcessor.this.getMapJsonProvider().toMap(new ByteArrayInputStream(message.getData()));
                        if (SqlBulkCopierInspectionDataProcessor.this.getLog().isTraceEnabled()) {
                            SqlBulkCopierInspectionDataProcessor.this.getLog().trace("Map of size {} created", Integer.valueOf(map.size()));
                        }
                        SqlBulkCopierInspectionDataProcessor.this.processInspectionReportReadEvent(new InspectionReportReadEvent(this, map));
                    } catch (IOException | InterruptedException e) {
                        throw new RuntimeException("Failed to process message", e);
                    }
                }
            }, new SubscriptionOptions.Builder().durableName("processed-reports-processing").deliverAllAvailable().maxInFlight(Runtime.getRuntime().availableProcessors()).build());
        } finally {
            this.natsInitMonitor.leave();
        }
    }

    public void init() {
        if (getLog().isTraceEnabled()) {
            getLog().trace("Initializing");
        }
        loadProperties();
        this.inspectionDataBatches = CacheBuilder.newBuilder().expireAfterAccess(1L, TimeUnit.HOURS).removalListener(removalNotification -> {
            if (getLog().isTraceEnabled()) {
                getLog().trace("Batch for date " + removalNotification.getKey() + " is being expired");
            }
            InspectionDataBatch inspectionDataBatch = (InspectionDataBatch) removalNotification.getValue();
            try {
                if (inspectionDataBatch != null) {
                    try {
                        inspectionDataBatch.commit();
                        inspectionDataBatch.destroy();
                    } catch (IOException | SQLException e) {
                        throw new RuntimeException(e);
                    }
                }
            } catch (Throwable th) {
                inspectionDataBatch.destroy();
                throw th;
            }
        }).build();
        try {
            initNats();
        } catch (IOException | InterruptedException | TimeoutException e) {
            scheduleNatsReconnect();
        }
        this.isInit.set(true);
        synchronized (this.isInit) {
            this.isInit.notifyAll();
        }
    }

    public void destroy() {
        if (getLog().isTraceEnabled()) {
            getLog().trace("Committing open batches on destroy");
        }
        this.inspectionDataBatches.invalidateAll();
        destroyNats();
        super.destroy();
    }

    public String getPropertiesFile() {
        return this.propertiesFile;
    }

    public void setPropertiesFile(String str) {
        this.propertiesFile = str;
    }

    private void processInspectionData(final Map<String, ?> map, String str, List<Map<String, Object>> list, final Map<String, Object> map2) throws IOException {
        if (getLog().isTraceEnabled()) {
            Logger log = getLog();
            Object[] objArr = new Object[4];
            objArr[0] = map.get("name");
            objArr[1] = str;
            objArr[2] = Integer.valueOf(map2.size());
            objArr[3] = Integer.valueOf(list != null ? list.size() : 0);
            log.trace("Processing data from node ID {}/{}, data map size: {}, service map size: {}", objArr);
        }
        Date date = new Date(((Number) map2.get("startDate")).longValue());
        Date date2 = new Date(((Number) map2.get("endDate")).longValue());
        if (getLog().isTraceEnabled()) {
            getLog().trace("Report period is {} to {}", new Object[]{date, date2});
        }
        final InspectionDataBatch[] inspectionDataBatchArr = new InspectionDataBatch[1];
        Date clearDateFields = DateUtil.clearDateFields(date2, new Integer[]{14, 13, 12, 11});
        synchronized (this.inspectionDataBatches) {
            inspectionDataBatchArr[0] = (InspectionDataBatch) this.inspectionDataBatches.getIfPresent(clearDateFields);
            if (inspectionDataBatchArr[0] == null) {
                if (getLog().isTraceEnabled()) {
                    getLog().trace("Initializing new batch for date: {}", clearDateFields);
                }
                inspectionDataBatchArr[0] = new InspectionDataBatch(getScheduledExecutorServiceFactory(), getSqlBulkCopier(), clearDateFields, this.maxBatchSize, this.maxBatchTimeSec, getCopyInConverters());
                this.inspectionDataBatches.put(clearDateFields, inspectionDataBatchArr[0]);
            } else {
                if (getLog().isTraceEnabled()) {
                    getLog().trace("Found existing batch of size {} for date {}", new Object[]{Integer.valueOf(inspectionDataBatchArr[0].getBatchSize()), clearDateFields});
                }
                inspectionDataBatchArr[0].validate();
            }
        }
        map2.put(AbstractSummaryReportCopyInConverter.NODE_INFO_TOKEN, map);
        map2.put(AbstractSummaryReportCopyInConverter.PROCESSED_SERVICES, list);
        getTransactionFactory().getTransaction(false).execute(new TransactionCallbackWithoutResult() { // from class: com.liquidwarelabs.stratusphere.data.inspection.processor.copy.SqlBulkCopierInspectionDataProcessor.5
            protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
                try {
                    inspectionDataBatchArr[0].add(map, map2);
                } catch (IOException | SQLException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }

    @Subscribe
    @AllowConcurrentEvents
    public void processInspectionReportReadEvent(InspectionReportReadEvent inspectionReportReadEvent) throws InterruptedException {
        this.eventMonitor.enterWhen(this.eventGuard);
        try {
            this.activeCount.incrementAndGet();
            if (!this.isInit.get()) {
                synchronized (this.isInit) {
                    if (getLog().isTraceEnabled()) {
                        getLog().trace("Event received but not initialized yet, waiting");
                    }
                    try {
                        this.isInit.wait();
                    } catch (InterruptedException e) {
                        throw new RuntimeException("Interrupted waiting on init()", e);
                    }
                }
            }
            if (getLog().isTraceEnabled()) {
                getLog().trace("Processing event: " + inspectionReportReadEvent);
            }
            Map inspectionDataMap = inspectionReportReadEvent.getInspectionDataMap();
            if (getLog().isTraceEnabled()) {
                getLog().trace("Processing inspection data map, size {}", Integer.valueOf(inspectionDataMap.size()));
            }
            Map<String, ?> map = (Map) inspectionDataMap.get("node");
            String str = (String) inspectionDataMap.get("remoteAddress");
            List<Map<String, Object>> list = (List) inspectionDataMap.get("processedApplicationServices");
            Map<String, Object> map2 = (Map) inspectionDataMap.get("report");
            if (map == null || map.get("id") == null) {
                throw new IllegalArgumentException("Invalid report, nodeId not found");
            }
            try {
                processInspectionData(map, str, list, map2);
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        } finally {
            this.activeCount.decrementAndGet();
        }
    }

    private void loadProperties() {
        if (!StringUtils.isNoneEmpty(new CharSequence[]{getPropertiesFile()}) || getPropertiesUtil() == null) {
            return;
        }
        if (getLog().isTraceEnabled()) {
            getLog().trace("Loading properties from file {}", getPropertiesFile());
        }
        this.maxBatchSize = Integer.parseInt(getPropertiesUtil().getPropertiesValue(getPropertiesFile(), "maxBatchSize", "100"));
        this.maxBatchTimeSec = Integer.parseInt(getPropertiesUtil().getPropertiesValue(getPropertiesFile(), "maxBatchTimeSec", "120"));
        if (getLog().isTraceEnabled()) {
            getLog().trace("Max batch size initialized to {}, time (sec) to {}", Integer.valueOf(this.maxBatchSize), Integer.valueOf(this.maxBatchTimeSec));
        }
        if (this.inspectionDataBatches != null) {
            this.inspectionDataBatches.invalidateAll();
        }
    }

    @Subscribe
    public void inPropertiesChange(PropertiesChangeEvent propertiesChangeEvent) {
        if (StringUtils.isNoneEmpty(new CharSequence[]{getPropertiesFile()}) && propertiesChangeEvent.getPropertiesFile().getAbsolutePath().equals(getPropertiesFile())) {
            if (getLog().isTraceEnabled()) {
                getLog().trace("Reloading properties");
            }
            loadProperties();
        }
    }
}
