/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.sql.runtime.datasource.socket.trident;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.storm.Config;
import org.apache.storm.spout.Scheme;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TridentSocketSpout
implements IBatchSpout {
    private static final Logger LOG = LoggerFactory.getLogger(TridentSocketSpout.class);
    private final String host;
    private final int port;
    private final Scheme scheme;
    private volatile boolean _running = true;
    private BlockingDeque<String> queue;
    private Socket socket;
    private Thread readerThread;
    private BufferedReader in;
    private ObjectMapper objectMapper;
    private Map<Long, List<List<Object>>> batches;

    public TridentSocketSpout(Scheme scheme, String host, int port) {
        this.scheme = scheme;
        this.host = host;
        this.port = port;
    }

    public void open(Map conf, TopologyContext context) {
        this.queue = new LinkedBlockingDeque<String>();
        this.objectMapper = new ObjectMapper();
        this.batches = new HashMap<Long, List<List<Object>>>();
        try {
            this.socket = new Socket(this.host, this.port);
            this.in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
        }
        catch (IOException e) {
            throw new RuntimeException("Error opening socket: host " + this.host + " port " + this.port);
        }
        this.readerThread = new Thread(new SocketReaderRunnable());
        this.readerThread.start();
    }

    public void emitBatch(long batchId, TridentCollector collector) {
        List<List<Object>> batch = this.batches.get(batchId);
        if (batch == null) {
            batch = new ArrayList<List<Object>>();
            while (this.queue.peek() != null) {
                String line = this.queue.poll();
                List<Object> values = this.convertLineToTuple(line);
                if (values == null) continue;
                batch.add(values);
            }
            this.batches.put(batchId, batch);
        }
        for (List<Object> list : batch) {
            collector.emit(list);
        }
    }

    private List<Object> convertLineToTuple(String line) {
        return this.scheme.deserialize(ByteBuffer.wrap(line.getBytes()));
    }

    public void ack(long batchId) {
        this.batches.remove(batchId);
    }

    public void close() {
        this._running = false;
        this.readerThread.interrupt();
        this.queue.clear();
        this.closeQuietly(this.in);
        this.closeQuietly(this.socket);
    }

    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.setMaxTaskParallelism(1);
        return conf;
    }

    public Fields getOutputFields() {
        return this.scheme.getOutputFields();
    }

    private void die(Throwable t) {
        LOG.error("Halting process: TridentSocketSpout died.", t);
        if (this._running || t instanceof Error) {
            System.exit(11);
        }
    }

    private void closeQuietly(Closeable closeable) {
        try {
            if (closeable != null) {
                closeable.close();
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    private class SocketReaderRunnable
    implements Runnable {
        private SocketReaderRunnable() {
        }

        @Override
        public void run() {
            while (TridentSocketSpout.this._running) {
                try {
                    String line = TridentSocketSpout.this.in.readLine();
                    if (line == null) {
                        throw new RuntimeException("EOF reached from the socket. We can't read the data any more.");
                    }
                    TridentSocketSpout.this.queue.push(line.trim());
                }
                catch (Throwable t) {
                    TridentSocketSpout.this.die(t);
                }
            }
        }
    }
}

