/*
 * Decompiled with CFR 0.152.
 */
package com.talpie.linker;

import com.talpie.linker.AES;
import com.talpie.linker.ClientHandler;
import com.talpie.linker.Message;
import com.talpie.linker.RSA;
import com.talpie.linker.StatiCom;
import com.talpie.linker.StreamFrame;
import java.io.EOFException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

public class StreamSocketServer {
    private final ClientHandler ch;
    private final Socket socket;
    private final String streamId;
    private final ExecutorService rxExec = Executors.newSingleThreadExecutor();
    private final ExecutorService txExec = Executors.newSingleThreadExecutor();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private volatile boolean running = false;
    private RSA rsa;
    private AES aes;
    private OutputStream out;
    private InputStream in;
    private final BlockingQueue<StreamFrame> txQueue;
    private long rxFrames = 0L;
    private long rxBytes = 0L;
    private long txFrames = 0L;
    private long txBytes = 0L;

    public String getStreamId() {
        return this.streamId;
    }

    public StreamSocketServer(ClientHandler ch, Socket socket, String streamId, int queueSize) {
        this.ch = ch;
        this.socket = socket;
        this.streamId = streamId;
        this.rsa = ch.getRsa();
        this.aes = ch.getAes();
        this.txQueue = new ArrayBlockingQueue<StreamFrame>(queueSize);
        try {
            socket.setTcpNoDelay(true);
            socket.setKeepAlive(true);
            socket.setReceiveBufferSize(0x100000);
            socket.setSendBufferSize(0x100000);
        }
        catch (Exception e) {
            ch.getServerService().getListenersHandlers().error(this, e);
        }
    }

    public void start() {
        try {
            this.out = this.socket.getOutputStream();
            this.in = this.socket.getInputStream();
            if (!this.handshake()) {
                this.stop();
                return;
            }
            this.running = true;
            this.ch.getServerService().getListenersHandlers().streamOpen(this.ch.getServerService(), this.ch, this, this.streamId);
            this.rxExec.submit(this::rxLoop);
            this.txExec.submit(this::txLoop);
        }
        catch (Exception e) {
            this.ch.getServerService().getListenersHandlers().error(this, e);
            this.stop();
        }
    }

    public void stop() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        this.running = false;
        try {
            this.txQueue.clear();
            this.rxExec.shutdownNow();
            this.txExec.shutdownNow();
            if (this.out != null) {
                this.out.close();
            }
            if (this.in != null) {
                this.in.close();
            }
            if (this.socket != null) {
                this.socket.close();
            }
            this.ch.getServerService().removeActiveStream(this.streamId, this);
            this.ch.getServerService().getListenersHandlers().streamClose(this.ch.getServerService(), this.ch, this, this.streamId);
        }
        catch (Exception e) {
            this.ch.getServerService().getListenersHandlers().error(this, e);
        }
    }

    public boolean offerFrame(StreamFrame f) {
        if (this.txQueue.offer(f)) {
            return true;
        }
        this.txQueue.poll();
        return this.txQueue.offer(f);
    }

    private boolean handshake() {
        try {
            StatiCom.writeLine(this.out, this.aes.encrypt("#_STREAM-SOCKET-READY++"));
            this.ch.getServerService().getListenersHandlers().streamHandshakeCompleted(this.ch.getServerService(), this.ch, this, this.streamId);
            return true;
        }
        catch (Exception e) {
            this.ch.getServerService().getListenersHandlers().streamHandshakeFailed(this.ch.getServerService(), this.ch, this, this.streamId, e);
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rxLoop() {
        try {
            while (this.running) {
                String encHeaderStr = StatiCom.readLineString(this.in);
                if (encHeaderStr == null) {
                    break;
                }
                byte[] encHeader = Base64.getDecoder().decode(encHeaderStr);
                byte[] headerPlain = this.aes.decrypt(encHeader, null);
                Message msg = Message.fromHeader(new String(headerPlain, StandardCharsets.UTF_8));
                int total = msg.getLength().intValueExact();
                byte[] encPayload = total > 0 ? StreamSocketServer.readPayload(this.in, total) : new byte[]{};
                byte[] plain = total > 0 ? this.aes.decrypt(encPayload, encHeader) : new byte[]{};
                msg.setPayload(plain);
                ++this.rxFrames;
                this.rxBytes += (long)encHeaderStr.length() + 1L + (long)encPayload.length;
                this.ch.getServerService().getListenersHandlers().streamProgressRx(this.ch.getServerService(), this.ch, this, this.rxFrames, this.rxBytes);
                String route = msg.getRoute();
                if (route.equals("#_STREAM++/CLOSE")) break;
                if (route.equals("#_STREAM++/CLOSE-ACK")) {
                    break;
                }
                if (route.equals("#_STREAM++/META/" + this.streamId)) {
                    this.ch.getServerService().getListenersHandlers().streamMeta(this.ch.getServerService(), this.ch, this, this.streamId, plain);
                    continue;
                }
                if (route.equals("#_STREAM++/DATA/" + this.streamId)) {
                    StreamFrame f = StreamFrame.decode(this.streamId, plain);
                    this.ch.getServerService().getListenersHandlers().streamFrameIn(this.ch.getServerService(), this.ch, this, f, (long)encHeaderStr.length() + 1L + (long)encPayload.length);
                    continue;
                }
                this.ch.getServerService().getListenersHandlers().error(this, new IllegalStateException("Unexpected route: " + route));
            }
        }
        catch (Exception e) {
            this.ch.getServerService().getListenersHandlers().error(this, e);
        }
        finally {
            this.stop();
        }
    }

    private void txLoop() {
        try {
            while (this.running) {
                StreamFrame f = this.txQueue.take();
                Message m = new Message("#_STREAM++/DATA/" + this.streamId, StreamFrame.encode(f));
                long bytes = this.writeMessageCountOnWire(m);
                ++this.txFrames;
                this.txBytes += bytes;
                this.ch.getServerService().getListenersHandlers().streamFrameOut(this.ch.getServerService(), this.ch, this, f, bytes);
                this.ch.getServerService().getListenersHandlers().streamProgressTx(this.ch.getServerService(), this.ch, this, this.txFrames, this.txBytes);
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            this.ch.getServerService().getListenersHandlers().error(this, e);
        }
    }

    private static byte[] readPayload(InputStream in, int total) throws Exception {
        int got;
        byte[] buf = new byte[total];
        for (int off = 0; off < total; off += got) {
            got = in.read(buf, off, Math.min(4096, total - off));
            if (got != -1) continue;
            throw new EOFException();
        }
        return buf;
    }

    private long writeMessageCountOnWire(Message msg) throws Exception {
        byte[] plain = msg.getPayload();
        int encLen = this.aes.encryptedLength(plain.length);
        byte[] saved = plain;
        msg.setPayload(new byte[encLen]);
        String headerStr = msg.getHeader();
        msg.setPayload(saved);
        String encHeaderStr = this.aes.encrypt(headerStr);
        StatiCom.writeLine(this.out, encHeaderStr);
        byte[] encHeader = Base64.getDecoder().decode(encHeaderStr);
        byte[] encPayload = this.aes.encrypt(plain, encHeader);
        msg.setPayload(encPayload);
        this.out.write(encPayload);
        this.out.flush();
        return (long)encHeaderStr.length() + 1L + (long)encPayload.length;
    }
}

