返回列表 发布新帖
查看: 54|回复: 1

算能 SophNet 免费 DeepSeek R1 满血 API

发表于 前天 15:05 | 查看全部 |阅读模式

立刻注册账号,享受更清爽的界面!

您需要 登录 才可以下载或查看,没有账号?注册

×
今天又发现一家免费的第三方 DeepSeek R1 API,注册就有免费额度 50¥(现在缩减到 20¥了,而且多了个邀请码机制,不过被邀请的没有额外额度
https://sophnet.com/#/model/overview
官网标参数如下:
8cbb5079db65a2bf7abc577166ed3edd.jpg
实测速度非常快,比华为云和腾讯云 LKEAP 都快,推理信息放在 reasoning_content 字段。
懒得注册的自己玩,附上脚本
import requests
import time
from concurrent.futures import ThreadPoolExecutor
import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(message)s'
)

# 定义常量
TOKEN = ''
PROJECT_ID = '887100'
PROJECT_TYPE = '1'

# 获取手机号码

def get_mobile():
    print('请求获取手机号码...')
    response = requests.post('http://h5.do889.com:81/api/get_mobile', data={
        'token': TOKEN + ';project_id=' + PROJECT_ID + ';project_type=' + PROJECT_TYPE + ';operator=0;loop=1',
    })
    print(f'获取手机号码请求的响应: {response.status_code}, 内容: {response.text}')
    mobile_data = response.json()
    return mobile_data.get('mobile')

# 发送验证码请求

def send_code(phone_number):
    print('请求发送验证码...')
    code_response = requests.post('https://sophnet.com/api/sys/code', json={
        'phoneNumber': phone_number
    })
    print(f'发送验证码请求的响应: {code_response.status_code}, 内容: {code_response.text}')
    return code_response

# 轮询获取短信

def poll_for_sms(phone_number):
    print('开始轮询获取短信...')
    start_time = time.time()  # 记录开始时间
    while True:
        message_response = requests.post('http://h5.do889.com:81/api/get_message', data={
            'token': TOKEN + ';phone_num=' + phone_number + ';project_type=' + PROJECT_TYPE + ';project_id=' + PROJECT_ID
        })
        print(f'获取短信请求的响应: {message_response.status_code}, 内容: {message_response.text}')
        message_data = message_response.json()
        if message_data['message'] == '短信还未到达,请继续获取':
            print('短信还未到达,继续轮询...')
            if time.time() - start_time > 30:  # 检查是否超过 30 秒
                print('超过 30 秒未收到短信,跳过此次...')
                return None
            time.sleep(5)  # 等待 5 秒后重试
        else:
            code = message_data['data'][0]['modle'].split('验证码:')[1].split(',')[0]
            print(f'获取到的验证码: {code}')
            return code

# 登录并获取 access token

def login(phone_number, code):
    print('请求登录并获取 access token...')
    login_response = requests.post('https://sophnet.com/api/sys/login/code', json={
        'phoneNumber': phone_number,
        'code': code,
        'password': ''
    })
    print(f'登录请求的响应: {login_response.status_code}, 内容: {login_response.text}')
    return login_response.json()['result']['token']

# 获取组织ID

def get_org_id(access_token):
    print('获取组织ID...')
    org_response = requests.get('https://sophnet.com/api/orgs/joined-org', headers={
        'Authorization': f'Bearer {access_token}'
    })
    print(f'获取组织ID响应: {org_response.status_code}, 内容: {org_response.text}')
    return org_response.json()['result']['list'][0]['id']

# 获取项目ID

def get_project_id(org_id, access_token):
    print('获取项目ID...')
    project_response = requests.get(f'https://sophnet.com/api/orgs/{org_id}/projects', params={
        'pageNum': 1,
        'pageSize': 99999
    }, headers={
        'Authorization': f'Bearer {access_token}'
    })
    print(f'获取项目ID响应: {project_response.status_code}, 内容: {project_response.text}')
    return project_response.json()['result']['list'][0]['id']

# 请求服务

def request_service(project_id, access_token, display_model_id):
    print('请求服务...')
    service_response = requests.post(f'https://sophnet.com/api/projects/{project_id}/service', json={
        "displayModelId": display_model_id,
        "llmType": "ChatCompletion",
        "quota": 200
    }, headers={
        'Authorization': f'Bearer {access_token}'
    })
    print(f'请求服务响应: {service_response.status_code}, 内容: {service_response.text}')
    return service_response

# 请求 API Key

def request_api_key(project_id, access_token):
    print('请求服务以获取 API Key...')
    api_key_response = requests.post(f'https://sophnet.com/api/projects/{project_id}/apikey', headers={
        'Authorization': f'Bearer {access_token}'
    }, json={
        'name': 'API Key'
    })
    print(f'获取 API Key 请求的响应: {api_key_response.status_code}, 内容: {api_key_response.text}')
    return api_key_response.json()['result']['apikey']

def process_single_flow():
    """执行单个完整的流程"""
    try:
        phone_number = get_mobile()
        if not phone_number:
            logging.info("获取手机号失败")
            return

        code_response = send_code(phone_number)
        if not code_response:
            logging.info(f"发送验证码失败: {phone_number}")
            return

        code = poll_for_sms(phone_number)
        if not code:
            logging.info(f"获取验证码失败: {phone_number}")
            return

        access_token = login(phone_number, code)
        if not access_token:
            logging.info(f"登录失败: {phone_number}")
            return

        org_id = get_org_id(access_token)
        if not org_id:
            logging.info(f"获取组织ID失败: {phone_number}")
            return

        project_id = get_project_id(org_id, access_token)
        if not project_id:
            logging.info(f"获取项目ID失败: {phone_number}")
            return

        # 并发执行服务请求和API密钥获取
        with ThreadPoolExecutor() as executor:
            # 先执行服务请求
            service_futures = [
                executor.submit(request_service, project_id, access_token, 11),
                executor.submit(request_service, project_id, access_token, 6)
            ]
            # 等待服务请求完成
            for future in service_futures:
                service_response = future.result()
                if not service_response:
                    logging.error("服务请求失败")
                    return False
            
            # 获取API密钥
            api_key_future = executor.submit(request_api_key, project_id, access_token)
            api_key = api_key_future.result()
            if api_key:
                with open('result.txt', 'a') as f:
                    f.write(api_key + '\n')  # 保存 API 密钥
                logging.info(f"API Key 已保存: {phone_number}")
            else:
                logging.error("获取 API Key 失败")
                return False

        logging.info(f"流程完成: {phone_number}")
        return True

    except Exception as e:
        logging.error(f"流程出错: {str(e)}")
        return False

def main(concurrent_flows=5):
    """
    主函数,控制并发流程数量,当一个流程完成后立即启动新的流程
    :param concurrent_flows: 最大并发流程数量
    """
    logging.info(f"开始运行,最大并发数: {concurrent_flows}")
    
    # 创建一个长期运行的线程池
    with ThreadPoolExecutor(max_workers=concurrent_flows) as executor:
        running_futures = set()
        
        while True:
            # 检查并移除已完成的任务
            done_futures = {f for f in running_futures if f.done()}
            for future in done_futures:
                try:
                    result = future.result()
                    if result:
                        logging.info("一个流程成功完成")
                    else:
                        logging.info("一个流程失败")
                except Exception as e:
                    logging.error(f"流程执行出错: {str(e)}")
                running_futures.remove(future)
            
            # 当有空闲线程时,立即启动新的流程
            while len(running_futures) < concurrent_flows:
                new_future = executor.submit(process_single_flow)
                running_futures.add(new_future)
                logging.info("启动新的流程")
            
            # 短暂休眠以避免CPU过度使用
            time.sleep(0.1)

if __name__ == "__main__":
    # 设置并发数量为5,您可以根据需要调整这个数字
    main(concurrent_flows=20)
爱生活,爱奶昔~
发表于 前天 15:05 | 查看全部
备案的,高级模型还只有v3和r1感觉没必要
爱生活,爱奶昔~
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

  • 关注公众号
  • 添加微信客服
© 2025 Naixi Networks 沪ICP备13020230号-1|沪公网安备 31010702007642号
关灯 在本版发帖
扫一扫添加微信客服
返回顶部
快速回复 返回顶部 返回列表