腾讯云知识

腾讯云API调用示例

Java package com.pingan.pacloud.api.demo; import java.util.UUID; import org.apache.commons.codec.binary.Base64; import org.junit.Test; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import java.io.UnsupportedEncodingExcept

Java
 
    package com.pingan.pacloud.api.demo;
   
    import java.util.UUID;
    import org.apache.commons.codec.binary.Base64;
    import org.junit.Test;
    import javax.crypto.Mac;
    import javax.crypto.spec.SecretKeySpec;
    import java.io.UnsupportedEncodingException;
    import java.net.URLEncoder;
    import java.security.InvalidKeyException;
    import java.security.NoSuchAlgorithmException;
    import java.util.*;
   
    public class ApiSignatureDemo {
       
        @Test
        public void apiSignatureDemo() {
           
            // 请求的url
            String url = "https://api.yun.pingan.com/api/v1";
            // 配置AccessKey,替换此处即可
            String accessKeyId = "用户的AccessKeyId";
            String accessKeySecret = "用户的AccessKeySecret";
            // 构造请求参数
            Map<String, String> paramMap = new HashMap<>();
            //公共参数,这部分参数取值基本固定,只需替换accessKeyId
            paramMap.put("version", "2017-01-01");
            paramMap.put("signatureMethod", "HMAC-SHA256");
            paramMap.put("signatureVersion", "1.0");
            paramMap.put("timestamp", Long.toString(System.currentTimeMillis()));
            paramMap.put("signatureNonce", UUID.randomUUID().toString().replaceAll("-", ""));
            paramMap.put("accessKeyId", accessKeyId);
            // 业务参数,以获取操作审计记录为例
            paramMap.put("Action", "ListBizLogs");
            paramMap.put("PageNumber", "1");
            paramMap.put("PageSize", "10");
            paramMap.put("startTime", "2019-01-13 00:00:00");
            // 获取要被签名的参数串,参数要做特殊处理
            String paramStrToSign = prepareParamStrToSign(paramMap);
            // 对参数进行签名
            String signature = getParamSignature(accessKeySecret, paramStrToSign);
            paramMap.put("signature", signature);
            // 拼接完整的请求参数串
            String requestParamStr = getRequestParamStr(paramMap);
            // 拼接完整的请求url
            String finalRequestUrl = url + "?" + requestParamStr;
            System.out.println(finalRequestUrl);
            // TODO 使用各类请求工具发起请求,具体实现略
        }
       
        private String getRequestParamStr(Map<String, String> paramMap) {
            Iterator<Map.Entry<String, String>> iterator = paramMap.entrySet().iterator();
            StringBuilder paramStrBuilder = new StringBuilder();
            while(iterator.hasNext()) {
                Map.Entry<String, String> entry = iterator.next();
                String paramName = entry.getKey();
                String paramValue = entry.getValue();
                paramStrBuilder.append(paramName).append("=").append(paramValue).append("&");
            }
            return paramStrBuilder.toString().substring(0, paramStrBuilder.length() - 1);
        }
       
        private String prepareParamStrToSign(Map<String, String> paramMap) {
       
            List<String> paramToSignList = new ArrayList<>();
            Iterator<Map.Entry<String, String>> iterator = paramMap.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, String> entry = iterator.next();
                String paramName = entry.getKey();
                String paramValue = entry.getValue();
                try {
                    String encodedParamValue = URLEncoder.encode(paramValue, "UTF-8").replace("+", "%20")
                            .replace("*", "%2A")
                            .replace("%7E", "~")
                            .replace(":", "%3A")
    .replace(",", "%2C");
                    paramToSignList.add(paramName.toLowerCase() + "=" + encodedParamValue.toLowerCase());
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
            Collections.sort(paramToSignList);
            StringBuilder paramStrToSignBuilder = new StringBuilder();
            for (int i = 0; i < paramToSignList.size(); ++i) {
                paramStrToSignBuilder.append(paramToSignList.get(i));
                if (i != paramToSignList.size() - 1) {
                    paramStrToSignBuilder.append("&");
                }
            }
            return paramStrToSignBuilder.toString();
        }
       
        private String getParamSignature(String accessKeySecret, String paramStrToSign) {
            Mac mac = null;
            try {
                mac = Mac.getInstance("HmacSHA256");
            } catch (NoSuchAlgorithmException e) {
                e.printStackTrace();
            }
            SecretKeySpec keySpec = new SecretKeySpec(accessKeySecret.getBytes(), "HmacSHA256");
            try {
                mac.init(keySpec);
            } catch (InvalidKeyException e) {
                e.printStackTrace();
            }
            mac.update(paramStrToSign.getBytes());
            byte[] encryptedBytes = mac.doFinal();
            return Base64.encodeBase64String(encryptedBytes).replace("+", "%2B").replace("\r\n", "");
        }
    }
   
Python2.X
 
    # -*- coding:UTF-8 -*-
    import requests
    import time
    import random
    import hmac
    import hashlib
    import base64
    import urllib
    import json
   
    class PaCloudClient:
   
        __endpoint = ""
        __accessKeyId = ""
        __accessKeySecret = ""
        __algorithm = "HMAC-SHA256"
        __signatureVersion = '1.0'
        __version = '2017-01-01'
        __headers = {"Accept-Language":"zh-CN,zh"}
   
        def __init__(self, endpoint):
            self.__endpoint = endpoint
   
        def __calculate_signature(self, accessKeySecret, dataToSign, signMethod):
            """
            calculate signature of raw data with access key secret
            """
            digest = hmac.new(accessKeySecret.encode(), dataToSign.encode(), signMethod).digest()
            return base64.b64encode(digest)
       
        def __get_signature(self, accessKeySecret, dataToSign):
            """
            Sign the data with access key secret
            """
            signature = self.__calculate_signature(accessKeySecret, dataToSign, hashlib.sha256)
            return signature
                
        def __get_signature_nonce(self):
            """"
            Return a nonce value for signing data, which is used to guarantee uniqueness for the request
            """
            signaturenonce = ''
            for i in range(19):
                signaturenonce = signaturenonce + str(random.randint(1, 9))
            return signaturenonce
   
        def __lower_params(self, paramDict):
            """
            Convert all key in param dict to lower case
            """
            paramDictNew = {}
            for k, v in paramDict.items():
                paramDictNew[k.lower()] = v
            return paramDictNew
   
        def __add_public_params(self, paramDict):
            """
            Put required public paramters to param dict for signing data
            """
            paramDict['accessKeyId'] = self.__accessKeyId
            paramDict['signatureMethod'] = self.__algorithm
            paramDict['signatureVersion'] = self.__signatureVersion
            paramDict['version'] = self.__version
            paramDict['timestamp'] = self.__get_timestamp_str()
            paramDict['signatureNonce'] = self.__get_signature_nonce()
            return paramDict
   
        def __build_data_to_sign(self, paramDict):
            """
            Build raw data according to certain rules, raw data will be signed before send request
            1. Convert all key in param dict to lower case
            2. Sort the key in alphabetical order
            3. Encode each value in param dict according to url encoding
            4. Convert each pair of '{key:value}' in param dict to 'key=value' and link each pair with '&'
               for example:
                   before convert param dict: {"bkey1":"value1", "akey2": "value2", "ckey3": "value3"}
                   after convert param dict: akey2=value2&bkey1=value1&ckey3=value3
            """
            data = ''
            lower_params = self.__lower_params(paramDict)
            for key in sorted(lower_params.keys()):
                paramValue = str(lower_params[key])
                encodedParamValue = urllib.quote(paramValue)
                encodedParamValueToLower = encodedParamValue.lower()
                if data:
                    data = data + '&' +  key.lower() + '=' + encodedParamValueToLower
                else:
                    data = key.lower() + '=' + encodedParamValueToLower
            return data
       
        def __get_timestamp_str(self):
            """
            Get current timestamp string
            """
            return "%d" % (time.time()*1000)
   
        def initAccessKey(self, accessKeyId, accessKeySecret):
            """
            Set user's AccessKey
            Params:
                accessKeyId: Id of access key
                accessKeySecret: Secret of access key
            """
            self.__accessKeyId = accessKeyId
            self.__accessKeySecret = accessKeySecret
   
        def setSignAlgorithm(self, algorithm):
            """
            Set algorithm for signature
            Params:
                algotithm: 'HMAC-SHA1' or 'HMAC-SHA256' is required, default value 'HMAC-SHA256' is recommended
            """
            self.__algorithm = algorithm
   
        def setSignatureVersion(self, signatureVersion):
            """
            Set signature version of request
            Params:
                signatureVersion: Default value '1.0'
            """
            self.__signatureVersion = signatureVersion
   
        def setVersion(self, version):
            """
            Set version of request
            Params:
                version: Default value '2017-01-01'
            """
            self.__version = version
       
        def setHeaders(self, headers):
            """
            Set headers of request
            Params:
                headers: a dict which contains headers
            """
            self.__headers = headers
       
        def doAction(self, paramDict):
            """
            Request server for a certain api
            Params:
                paramDict: a dict which contain all required parameter for the certain api
            Return:
                Response json string
            """
            paramDict = self.__add_public_params(paramDict)
            dataToSign = self.__build_data_to_sign(paramDict)
            print("===========================================================================")       
            print(">> Param String To Signature:")
            print("%s" % dataToSign)
            print("------------------------------------------")       
            signature = self.__get_signature(self.__accessKeySecret, dataToSign).replace("\r\n", "")
   
            print(">> Signature:")
            print("%s" % signature)
            print("------------------------------------------")       
            paramDict['signature'] = signature
            res = requests.get(self.__endpoint, paramDict, headers = self.__headers)
           
            print(">> Request:")       
            print("%s" % res.url)       
            print("------------------------------------------")       
            print(">> Response")
            print("%s" % json.dumps(res.json(), indent = 4, ensure_ascii=False))
            print("===========================================================================")       
            return res.json()
   
    def main():
        endpoint = 'https://api.yun.pingan.com/api/v1'
        accessKeyId = '用户的AccessKeyId'
        accessKeySecret = '用户的AccessKeySecret'
        algorithm = "HMAC-SHA256"
        signatureVerion = "1.0"
        version = "2017-01-01"
        #headers = {"Accept-Language":"en-US,en"}
   
        params = {
            #业务参数
            "action": "RunInstances",
        }
        paclient = PaCloudClient(endpoint)
        paclient.initAccessKey(accessKeyId, accessKeySecret)
        paclient.setSignAlgorithm(algorithm)
        paclient.setVersion(version)
        paclient.setSignatureVersion(signatureVerion)
        #paclient.setHeaders(headers)
        paclient.doAction(params)
   
    if __name__ == '__main__':
        main()
   
   
Python3.x
 
    # -*- coding:UTF-8 -*-
    import requests
    import time
    import random
    import hmac
    import hashlib
    import base64
    import urllib
    import json
   
    class PaCloudClient:
   
        __endpoint = ""
        __accessKeyId = ""
        __accessKeySecret = ""
        __algorithm = "HMAC-SHA256"
        __signatureVersion = '1.0'
        __version = '2017-01-01'
        __headers = {"Accept-Language":"zh-CN,zh"}
   
        def __init__(self, endpoint):
            self.__endpoint = endpoint
   
           
        def __calculate_signature(self, accessKeySecret, dataToSign, signMethod):
            """
            calculate signature of raw data with access key secret
            """
            digest = hmac.new(accessKeySecret.encode(), dataToSign.encode(), signMethod).digest()
            return base64.b64encode(digest)
       
        def __get_signature(self, accessKeySecret, dataToSign):
            """
            Sign the data with access key secret
            """
            signature = self.__calculate_signature(accessKeySecret, dataToSign, hashlib.sha256)
            return signature
   
        def __get_signature_nonce(self):
            """"
            Return a nonce value for signing data, which is used to guarantee uniqueness for the request
            """
            signaturenonce = ''
            for i in range(19):
                signaturenonce = signaturenonce + str(random.randint(1, 9))
            return signaturenonce
   
        def __lower_params(self, paramDict):
            """
            Convert all key in param dict to lower case
            """
            paramDictNew = {}
            for k, v in paramDict.items():
                paramDictNew[k.lower()] = v
            return paramDictNew
   
        def __add_public_params(self, paramDict):
            """
            Put required public paramters to param dict for signing data
            """
            paramDict['accessKeyId'] = self.__accessKeyId
            paramDict['signatureMethod'] = self.__algorithm
            paramDict['signatureVersion'] = self.__signatureVersion
            paramDict['version'] = self.__version
            paramDict['timestamp'] = self.__get_timestamp_str()
            paramDict['signatureNonce'] = self.__get_signature_nonce()
            return paramDict
   
        def __build_data_to_sign(self, paramDict):
            """
            Build raw data according to certain rules, raw data will be signed before send request
            1. Convert all key in param dict to lower case
            2. Sort the key in alphabetical order
            3. Encode each value in param dict according to url encoding
            4. Convert each pair of '{key:value}' in param dict to 'key=value' and link each pair with '&'
               for example:
                   before convert param dict: {"bkey1":"value1", "akey2": "value2", "ckey3": "value3"}
                   after convert param dict: akey2=value2&bkey1=value1&ckey3=value3
            """
            data = ''
            lower_params = self.__lower_params(paramDict)
            for key in sorted(lower_params.keys()):
                paramValue = str(lower_params[key])
                encodedParamValue = urllib.parse.quote(paramValue)
                encodedParamValueToLower = encodedParamValue.lower()
                if data:
                    data = data + '&' + key.lower() + '=' + encodedParamValueToLower
                else:
                    data = key.lower() + '=' + encodedParamValueToLower
            return data
   
        def __get_timestamp_str(self):
            """
            Get current timestamp string
            """
            return "%d" % (time.time()*1000)
   
        def initAccessKey(self, accessKeyId, accessKeySecret):
            """
            Set user's AccessKey
            Params:
                accessKeyId: Id of access key
                accessKeySecret: Secret of access key
            """
            self.__accessKeyId = accessKeyId
            self.__accessKeySecret = accessKeySecret
   
        def setSignAlgorithm(self, algorithm):
            """
            Set algorithm for signature
            Params:
                algotithm: 'HMAC-SHA1' or 'HMAC-SHA256' is required, default value 'HMAC-SHA256' is recommended
            """
            self.__algorithm = algorithm
   
        def setSignatureVersion(self, signatureVersion):
            """
            Set signature version of request
            Params:
                signatureVersion: Default value '1.0'
            """
            self.__signatureVersion = signatureVersion
   
        def setVersion(self, version):
            """
            Set version of request
            Params:
                version: Default value '2017-01-01'
            """
            self.__version = version
       
        def setHeaders(self, headers):
            """
            Set headers of request
            Params:
                headers: a dict which contain headers
            """
            self.__headers = headers
   
        def doAction(self, paramDict):
            """
            Request server for a certain api
            Params:
                paramDict: a dict which contain all required parameter for the certain api
            Return:
                Response json string
            """
            paramDict = self.__add_public_params(paramDict)
            dataToSign = self.__build_data_to_sign(paramDict)
            print("===========================================================================")
            print(">> Param String To Signature:")
            print("%s" % dataToSign)
            print("------------------------------------------")
            signature = self.__get_signature(self.__accessKeySecret, dataToSign)
   
            print(">> Signature:")
            print("%s" % signature)
            print("------------------------------------------")
            paramDict['signature'] = signature
            res = requests.get(self.__endpoint, paramDict, headers = self.__headers)
   
            print(">> Request:")    
            print("%s" % res.url)
            print("------------------------------------------")
            print(">> Response")
            print("%s" % json.dumps(res.json(), indent = 4, ensure_ascii = False))
            print("===========================================================================")
            return res.json()
   
    def main():
        endpoint = 'https://api.yun.pingan.com/api/v1'
        accessKeyId = '用户AccessKeyId'
        accessKeySecret = '用户AccessKeySecret'
        algorithm = "HMAC-SHA256"
        signatureVerion = "1.0"
        version = "2017-01-01"
   
        params = {
            "action": "GetUser",
        }
        paclient = PaCloudClient(endpoint)
        paclient.initAccessKey(accessKeyId, accessKeySecret)
        paclient.setSignAlgorithm(algorithm)
        paclient.setVersion(version)
        paclient.setSignatureVersion(signatureVerion)
        paclient.doAction(params)
   
    if __name__ == '__main__':
        main()



上一篇:腾讯云API如何排查后端服务校验签名失败 下一篇:腾讯云API公共错误码说明

精彩导读