zezhiya 发表于 7 天前

算能 SophNet 免费 DeepSeek R1 满血 API

今天又发现一家免费的第三方 DeepSeek R1 API,注册就有免费额度 50¥(现在缩减到 20¥了,而且多了个邀请码机制,不过被邀请的没有额外额度
https://sophnet.com/#/model/overview
官网标参数如下:

实测速度非常快,比华为云和腾讯云 LKEAP 都快,推理信息放在 reasoning_content 字段。
懒得注册的自己玩,附上脚本
import requests
import time
from concurrent.futures import ThreadPoolExecutor
import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(message)s'
)

# 定义常量
TOKEN = ''
PROJECT_ID = '887100'
PROJECT_TYPE = '1'

# 获取手机号码

def get_mobile():
    print('请求获取手机号码...')
    response = requests.post('http://h5.do889.com:81/api/get_mobile', data={
      'token': TOKEN + ';project_id=' + PROJECT_ID + ';project_type=' + PROJECT_TYPE + ';operator=0;loop=1',
    })
    print(f'获取手机号码请求的响应: {response.status_code}, 内容: {response.text}')
    mobile_data = response.json()
    return mobile_data.get('mobile')

# 发送验证码请求

def send_code(phone_number):
    print('请求发送验证码...')
    code_response = requests.post('https://sophnet.com/api/sys/code', json={
      'phoneNumber': phone_number
    })
    print(f'发送验证码请求的响应: {code_response.status_code}, 内容: {code_response.text}')
    return code_response

# 轮询获取短信

def poll_for_sms(phone_number):
    print('开始轮询获取短信...')
    start_time = time.time()# 记录开始时间
    while True:
      message_response = requests.post('http://h5.do889.com:81/api/get_message', data={
            'token': TOKEN + ';phone_num=' + phone_number + ';project_type=' + PROJECT_TYPE + ';project_id=' + PROJECT_ID
      })
      print(f'获取短信请求的响应: {message_response.status_code}, 内容: {message_response.text}')
      message_data = message_response.json()
      if message_data['message'] == '短信还未到达,请继续获取':
            print('短信还未到达,继续轮询...')
            if time.time() - start_time > 30:# 检查是否超过 30 秒
                print('超过 30 秒未收到短信,跳过此次...')
                return None
            time.sleep(5)# 等待 5 秒后重试
      else:
            code = message_data['data']['modle'].split('验证码:').split(',')
            print(f'获取到的验证码: {code}')
            return code

# 登录并获取 access token

def login(phone_number, code):
    print('请求登录并获取 access token...')
    login_response = requests.post('https://sophnet.com/api/sys/login/code', json={
      'phoneNumber': phone_number,
      'code': code,
      'password': ''
    })
    print(f'登录请求的响应: {login_response.status_code}, 内容: {login_response.text}')
    return login_response.json()['result']['token']

# 获取组织ID

def get_org_id(access_token):
    print('获取组织ID...')
    org_response = requests.get('https://sophnet.com/api/orgs/joined-org', headers={
      'Authorization': f'Bearer {access_token}'
    })
    print(f'获取组织ID响应: {org_response.status_code}, 内容: {org_response.text}')
    return org_response.json()['result']['list']['id']

# 获取项目ID

def get_project_id(org_id, access_token):
    print('获取项目ID...')
    project_response = requests.get(f'https://sophnet.com/api/orgs/{org_id}/projects', params={
      'pageNum': 1,
      'pageSize': 99999
    }, headers={
      'Authorization': f'Bearer {access_token}'
    })
    print(f'获取项目ID响应: {project_response.status_code}, 内容: {project_response.text}')
    return project_response.json()['result']['list']['id']

# 请求服务

def request_service(project_id, access_token, display_model_id):
    print('请求服务...')
    service_response = requests.post(f'https://sophnet.com/api/projects/{project_id}/service', json={
      "displayModelId": display_model_id,
      "llmType": "ChatCompletion",
      "quota": 200
    }, headers={
      'Authorization': f'Bearer {access_token}'
    })
    print(f'请求服务响应: {service_response.status_code}, 内容: {service_response.text}')
    return service_response

# 请求 API Key

def request_api_key(project_id, access_token):
    print('请求服务以获取 API Key...')
    api_key_response = requests.post(f'https://sophnet.com/api/projects/{project_id}/apikey', headers={
      'Authorization': f'Bearer {access_token}'
    }, json={
      'name': 'API Key'
    })
    print(f'获取 API Key 请求的响应: {api_key_response.status_code}, 内容: {api_key_response.text}')
    return api_key_response.json()['result']['apikey']

def process_single_flow():
    """执行单个完整的流程"""
    try:
      phone_number = get_mobile()
      if not phone_number:
            logging.info("获取手机号失败")
            return

      code_response = send_code(phone_number)
      if not code_response:
            logging.info(f"发送验证码失败: {phone_number}")
            return

      code = poll_for_sms(phone_number)
      if not code:
            logging.info(f"获取验证码失败: {phone_number}")
            return

      access_token = login(phone_number, code)
      if not access_token:
            logging.info(f"登录失败: {phone_number}")
            return

      org_id = get_org_id(access_token)
      if not org_id:
            logging.info(f"获取组织ID失败: {phone_number}")
            return

      project_id = get_project_id(org_id, access_token)
      if not project_id:
            logging.info(f"获取项目ID失败: {phone_number}")
            return

      # 并发执行服务请求和API密钥获取
      with ThreadPoolExecutor() as executor:
            # 先执行服务请求
            service_futures = [
                executor.submit(request_service, project_id, access_token, 11),
                executor.submit(request_service, project_id, access_token, 6)
            ]
            # 等待服务请求完成
            for future in service_futures:
                service_response = future.result()
                if not service_response:
                  logging.error("服务请求失败")
                  return False
            
            # 获取API密钥
            api_key_future = executor.submit(request_api_key, project_id, access_token)
            api_key = api_key_future.result()
            if api_key:
                with open('result.txt', 'a') as f:
                  f.write(api_key + '\n')# 保存 API 密钥
                logging.info(f"API Key 已保存: {phone_number}")
            else:
                logging.error("获取 API Key 失败")
                return False

      logging.info(f"流程完成: {phone_number}")
      return True

    except Exception as e:
      logging.error(f"流程出错: {str(e)}")
      return False

def main(concurrent_flows=5):
    """
    主函数,控制并发流程数量,当一个流程完成后立即启动新的流程
    :param concurrent_flows: 最大并发流程数量
    """
    logging.info(f"开始运行,最大并发数: {concurrent_flows}")
   
    # 创建一个长期运行的线程池
    with ThreadPoolExecutor(max_workers=concurrent_flows) as executor:
      running_futures = set()
      
      while True:
            # 检查并移除已完成的任务
            done_futures = {f for f in running_futures if f.done()}
            for future in done_futures:
                try:
                  result = future.result()
                  if result:
                        logging.info("一个流程成功完成")
                  else:
                        logging.info("一个流程失败")
                except Exception as e:
                  logging.error(f"流程执行出错: {str(e)}")
                running_futures.remove(future)
            
            # 当有空闲线程时,立即启动新的流程
            while len(running_futures) < concurrent_flows:
                new_future = executor.submit(process_single_flow)
                running_futures.add(new_future)
                logging.info("启动新的流程")
            
            # 短暂休眠以避免CPU过度使用
            time.sleep(0.1)

if __name__ == "__main__":
    # 设置并发数量为5,您可以根据需要调整这个数字
    main(concurrent_flows=20)

nicexi 发表于 7 天前

备案的,高级模型还只有v3和r1感觉没必要
页: [1]
查看完整版本: 算能 SophNet 免费 DeepSeek R1 满血 API