import requests
import time
from concurrent.futures import ThreadPoolExecutor
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(message)s'
)
# 定义常量
TOKEN = ''
PROJECT_ID = '887100'
PROJECT_TYPE = '1'
# 获取手机号码
def get_mobile():
print('请求获取手机号码...')
response = requests.post('http://h5.do889.com:81/api/get_mobile', data={
'token': TOKEN + ';project_id=' + PROJECT_ID + ';project_type=' + PROJECT_TYPE + ';operator=0;loop=1',
})
print(f'获取手机号码请求的响应: {response.status_code}, 内容: {response.text}')
mobile_data = response.json()
return mobile_data.get('mobile')
# 发送验证码请求
def send_code(phone_number):
print('请求发送验证码...')
code_response = requests.post('https://sophnet.com/api/sys/code', json={
'phoneNumber': phone_number
})
print(f'发送验证码请求的响应: {code_response.status_code}, 内容: {code_response.text}')
return code_response
# 轮询获取短信
def poll_for_sms(phone_number):
print('开始轮询获取短信...')
start_time = time.time() # 记录开始时间
while True:
message_response = requests.post('http://h5.do889.com:81/api/get_message', data={
'token': TOKEN + ';phone_num=' + phone_number + ';project_type=' + PROJECT_TYPE + ';project_id=' + PROJECT_ID
})
print(f'获取短信请求的响应: {message_response.status_code}, 内容: {message_response.text}')
message_data = message_response.json()
if message_data['message'] == '短信还未到达,请继续获取':
print('短信还未到达,继续轮询...')
if time.time() - start_time > 30: # 检查是否超过 30 秒
print('超过 30 秒未收到短信,跳过此次...')
return None
time.sleep(5) # 等待 5 秒后重试
else:
code = message_data['data'][0]['modle'].split('验证码:')[1].split(',')[0]
print(f'获取到的验证码: {code}')
return code
# 登录并获取 access token
def login(phone_number, code):
print('请求登录并获取 access token...')
login_response = requests.post('https://sophnet.com/api/sys/login/code', json={
'phoneNumber': phone_number,
'code': code,
'password': ''
})
print(f'登录请求的响应: {login_response.status_code}, 内容: {login_response.text}')
return login_response.json()['result']['token']
# 获取组织ID
def get_org_id(access_token):
print('获取组织ID...')
org_response = requests.get('https://sophnet.com/api/orgs/joined-org', headers={
'Authorization': f'Bearer {access_token}'
})
print(f'获取组织ID响应: {org_response.status_code}, 内容: {org_response.text}')
return org_response.json()['result']['list'][0]['id']
# 获取项目ID
def get_project_id(org_id, access_token):
print('获取项目ID...')
project_response = requests.get(f'https://sophnet.com/api/orgs/{org_id}/projects', params={
'pageNum': 1,
'pageSize': 99999
}, headers={
'Authorization': f'Bearer {access_token}'
})
print(f'获取项目ID响应: {project_response.status_code}, 内容: {project_response.text}')
return project_response.json()['result']['list'][0]['id']
# 请求服务
def request_service(project_id, access_token, display_model_id):
print('请求服务...')
service_response = requests.post(f'https://sophnet.com/api/projects/{project_id}/service', json={
"displayModelId": display_model_id,
"llmType": "ChatCompletion",
"quota": 200
}, headers={
'Authorization': f'Bearer {access_token}'
})
print(f'请求服务响应: {service_response.status_code}, 内容: {service_response.text}')
return service_response
# 请求 API Key
def request_api_key(project_id, access_token):
print('请求服务以获取 API Key...')
api_key_response = requests.post(f'https://sophnet.com/api/projects/{project_id}/apikey', headers={
'Authorization': f'Bearer {access_token}'
}, json={
'name': 'API Key'
})
print(f'获取 API Key 请求的响应: {api_key_response.status_code}, 内容: {api_key_response.text}')
return api_key_response.json()['result']['apikey']
def process_single_flow():
"""执行单个完整的流程"""
try:
phone_number = get_mobile()
if not phone_number:
logging.info("获取手机号失败")
return
code_response = send_code(phone_number)
if not code_response:
logging.info(f"发送验证码失败: {phone_number}")
return
code = poll_for_sms(phone_number)
if not code:
logging.info(f"获取验证码失败: {phone_number}")
return
access_token = login(phone_number, code)
if not access_token:
logging.info(f"登录失败: {phone_number}")
return
org_id = get_org_id(access_token)
if not org_id:
logging.info(f"获取组织ID失败: {phone_number}")
return
project_id = get_project_id(org_id, access_token)
if not project_id:
logging.info(f"获取项目ID失败: {phone_number}")
return
# 并发执行服务请求和API密钥获取
with ThreadPoolExecutor() as executor:
# 先执行服务请求
service_futures = [
executor.submit(request_service, project_id, access_token, 11),
executor.submit(request_service, project_id, access_token, 6)
]
# 等待服务请求完成
for future in service_futures:
service_response = future.result()
if not service_response:
logging.error("服务请求失败")
return False
# 获取API密钥
api_key_future = executor.submit(request_api_key, project_id, access_token)
api_key = api_key_future.result()
if api_key:
with open('result.txt', 'a') as f:
f.write(api_key + '\n') # 保存 API 密钥
logging.info(f"API Key 已保存: {phone_number}")
else:
logging.error("获取 API Key 失败")
return False
logging.info(f"流程完成: {phone_number}")
return True
except Exception as e:
logging.error(f"流程出错: {str(e)}")
return False
def main(concurrent_flows=5):
"""
主函数,控制并发流程数量,当一个流程完成后立即启动新的流程
:param concurrent_flows: 最大并发流程数量
"""
logging.info(f"开始运行,最大并发数: {concurrent_flows}")
# 创建一个长期运行的线程池
with ThreadPoolExecutor(max_workers=concurrent_flows) as executor:
running_futures = set()
while True:
# 检查并移除已完成的任务
done_futures = {f for f in running_futures if f.done()}
for future in done_futures:
try:
result = future.result()
if result:
logging.info("一个流程成功完成")
else:
logging.info("一个流程失败")
except Exception as e:
logging.error(f"流程执行出错: {str(e)}")
running_futures.remove(future)
# 当有空闲线程时,立即启动新的流程
while len(running_futures) < concurrent_flows:
new_future = executor.submit(process_single_flow)
running_futures.add(new_future)
logging.info("启动新的流程")
# 短暂休眠以避免CPU过度使用
time.sleep(0.1)
if __name__ == "__main__":
# 设置并发数量为5,您可以根据需要调整这个数字
main(concurrent_flows=20)