package org.apache.seatunnel.flink.doris.sink;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Base64;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/flink/doris/sink/DorisStreamLoad.class */
public class DorisStreamLoad implements Serializable {
    private static final long serialVersionUID = -595233501819950489L;
    private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load?";
    private final String loadUrlStr;
    private final String authEncoding;
    private final Properties streamLoadProp;
    private static final Logger LOGGER = LoggerFactory.getLogger(DorisStreamLoad.class);
    private static final List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout");
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    /* loaded from: input_file:org/apache/seatunnel/flink/doris/sink/DorisStreamLoad$LoadResponse.class */
    public static class LoadResponse {
        public int status;
        public String respMsg;
        public String respContent;

        public LoadResponse(int i, String str, String str2) {
            this.status = i;
            this.respMsg = str;
            this.respContent = str2;
        }

        public String toString() {
            return "status: " + this.status + ", resp msg: " + this.respMsg + ", resp content: " + this.respContent;
        }
    }

    public DorisStreamLoad(String str, String str2, String str3, String str4, String str5, Properties properties) {
        this.loadUrlStr = String.format(LOAD_URL_PATTERN, str, str2, str3);
        this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", str4, str5).getBytes(StandardCharsets.UTF_8));
        this.streamLoadProp = properties;
    }

    public Properties getStreamLoadProp() {
        return this.streamLoadProp;
    }

    private HttpURLConnection getConnection(String str, String str2) throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str).openConnection();
        httpURLConnection.setInstanceFollowRedirects(false);
        httpURLConnection.setRequestMethod("PUT");
        httpURLConnection.setRequestProperty("Authorization", "Basic " + this.authEncoding);
        httpURLConnection.addRequestProperty("Expect", "100-continue");
        httpURLConnection.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
        httpURLConnection.addRequestProperty("label", str2);
        for (Map.Entry entry : this.streamLoadProp.entrySet()) {
            httpURLConnection.addRequestProperty(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
        }
        httpURLConnection.setDoOutput(true);
        httpURLConnection.setDoInput(true);
        return httpURLConnection;
    }

    public void load(String str) {
        LoadResponse loadBatch = loadBatch(str);
        LOGGER.info("Streamload Response:{}", loadBatch);
        if (loadBatch.status != HttpResponseStatus.OK.code()) {
            throw new RuntimeException("stream load error: " + loadBatch.respContent);
        }
        try {
            RespContent respContent = (RespContent) OBJECT_MAPPER.readValue(loadBatch.respContent, RespContent.class);
            if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus()) && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {
            } else {
                throw new RuntimeException(String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL()));
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private LoadResponse loadBatch(String str) {
        String format = String.format("flink_sink_%s_%s", new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date()), UUID.randomUUID().toString().replaceAll("-", ""));
        HttpURLConnection httpURLConnection = null;
        HttpURLConnection httpURLConnection2 = null;
        try {
            try {
                HttpURLConnection connection = getConnection(this.loadUrlStr, format);
                int responseCode = connection.getResponseCode();
                if (responseCode != HttpResponseStatus.TEMPORARY_REDIRECT.code()) {
                    throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + responseCode);
                }
                String headerField = connection.getHeaderField("Location");
                if (headerField == null) {
                    throw new Exception("redirect location is null");
                }
                HttpURLConnection connection2 = getConnection(headerField, format);
                BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(connection2.getOutputStream());
                bufferedOutputStream.write(str.getBytes());
                bufferedOutputStream.close();
                int responseCode2 = connection2.getResponseCode();
                String responseMessage = connection2.getResponseMessage();
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader((InputStream) connection2.getContent()));
                StringBuilder sb = new StringBuilder();
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    sb.append(readLine);
                }
                LoadResponse loadResponse = new LoadResponse(responseCode2, responseMessage, sb.toString());
                if (connection != null) {
                    connection.disconnect();
                }
                if (connection2 != null) {
                    connection2.disconnect();
                }
                return loadResponse;
            } catch (Exception e) {
                String str2 = "failed to stream load data with label:" + format;
                LOGGER.warn(str2, e);
                throw new RuntimeException("stream load error: " + str2);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                httpURLConnection.disconnect();
            }
            if (0 != 0) {
                httpURLConnection2.disconnect();
            }
            throw th;
        }
    }
}
