返回列表 发布新帖
查看: 63|回复: 1

relingo也重现江湖吧

发表于 前天 14:33 | 查看全部 |阅读模式

立刻注册账号,享受更清爽的界面!

您需要 登录 才可以下载或查看,没有账号?注册

×
邀请码不会找自己问其他人。刷满 10 年,自动升级为永久会员
import concurrent.futures
import random
import re
import threading
import time

import requests
from fake_useragent import UserAgent
from loguru import logger

referrer = "你的邀请码"


class WordGenerator:
    def __init__(self):
        # 常用辅音字母
        self.consonants = "bcdfghjklmnpqrstvwxyz"
        # 元音字母
        self.vowels = "aeiou"
        # 常用的字母组合
        self.common_pairs = [
            "th",
            "ch",
            "sh",
            "ph",
            "wh",
            "br",
            "cr",
            "dr",
            "fr",
            "gr",
            "pr",
            "tr",
        ]
        # 常用的词尾
        self.common_endings = ["ing", "ed", "er", "est", "ly", "tion", "ment"]
        # 常用的用户名后缀
        self.username_suffixes = [
            "123",
            "888",
            "666",
            "777",
            "999",
            "pro",
            "cool",
            "good",
            "best",
        ]

    def generate_syllable(self):
        """生成一个音节"""
        if random.random() < 0.3 and self.common_pairs:  # 30% 概率使用常用字母组合
            return random.choice(self.common_pairs) + random.choice(self.vowels)
        else:
            return random.choice(self.consonants) + random.choice(self.vowels)

    def generate_word(self, min_length=4, max_length=8):
        """生成一个随机单词"""
        word = ""
        target_length = random.randint(min_length, max_length)

        # 添加音节直到达到目标长度附近
        while len(word) < target_length - 2:
            word += self.generate_syllable()

        # 可能添加常用词尾
        if random.random() < 0.3 and len(word) < max_length - 2:
            word += random.choice(self.common_endings)
        elif len(word) < target_length:
            word += random.choice(self.consonants)

        return word.lower()

    def generate_random_username(self, min_length=3, max_length=8):
        """生成随机用户名"""
        username = self.generate_word(min_length, max_length)

        # 50% 的概率添加数字或特殊后缀
        if random.random() < 0.5:
            if random.random() < 0.7:  # 70% 概率添加数字
                username += str(random.randint(0, 999)).zfill(random.randint(2, 3))
            else:  # 30% 概率添加特殊后缀
                username += random.choice(self.username_suffixes)

        return username

    def generate_combined_username(self, num_words=1, separator="_"):
        """生成完整的组合用户名"""
        # 首先生成基础用户名
        base_username = self.generate_random_username()

        # 生成额外的随机单词
        words = [self.generate_word() for _ in range(num_words)]

        # 随机决定用户名放在前面还是后面
        if random.random() < 0.5:
            words.append(base_username)
        else:
            words.insert(0, base_username)

        return separator.join(words)


class MailTmClient:
    baseurl = "https://api.mail.tm"

    def __init__(self, user=None):
        # 初始化token为None,防止未设置时引发属性错误
        self.token = None
        self.acount = None

        if user is None:
            generator = WordGenerator()
            user = generator.generate_combined_username(1)

        # 重试机制
        for attempt in range(3):
            try:
                domain = self.get_domains()
                if not domain:
                    logger.warning(f"尝试 {attempt + 1}/3: 获取域名失败,重试中...")
                    time.sleep(2)
                    continue

                logger.info("Get domain:" + domain)
                self.acount = user + "@" + domain
                logger.info("Get acount:" + self.acount)

                account_result = self.acounts(self.acount)
                if not account_result:
                    logger.warning(f"尝试 {attempt + 1}/3: 创建账户失败,重试中...")
                    time.sleep(2)
                    continue

                token_result = self.get_token(self.acount)
                if not token_result:
                    logger.warning(f"尝试 {attempt + 1}/3: 获取令牌失败,重试中...")
                    time.sleep(2)
                    continue

                # 成功初始化
                break
            except Exception as e:
                logger.error(f"初始化邮箱客户端出错 (尝试 {attempt + 1}/3): {str(e)}")
                if attempt < 2:  # 如果不是最后一次尝试,则等待后重试
                    time.sleep(3)
                else:
                    raise Exception(f"初始化邮箱客户端失败,已重试3次: {str(e)}")

        # 检查是否成功初始化
        if not self.token or not self.acount:
            raise Exception("邮箱客户端初始化失败: token或账户未成功创建")

    def get_email(self):
        return self.acount

    def get_domains(self):
        try:
            response = requests.get(f"{self.baseurl}/domains", timeout=10)
            if response.status_code != 200:
                logger.error(f"获取域名失败: 状态码 {response.status_code}")
                return None

            response_data = response.json()
            if (
                "hydra:member" not in response_data
                or len(response_data["hydra:member"]) == 0
            ):
                logger.error("获取域名失败: 返回数据结构异常")
                return None

            return response_data["hydra:member"][0]["domain"]
        except Exception as e:
            logger.error(f"获取域名时出错: {str(e)}")
            return None

    def acounts(self, acount):
        try:
            json_data = {
                "address": acount,
                "password": "thisispassword",
            }
            response = requests.post(
                "https://api.mail.tm/accounts", json=json_data, timeout=10
            )

            if response.status_code != 201 and response.status_code != 200:
                logger.error(f"创建账户失败: 状态码 {response.status_code}")
                if response.text:
                    logger.info("acounts:" + response.text)
                return None

            if not response.text:
                logger.error("创建账户失败: 空响应")
                return None

            logger.info("acounts:" + response.text)
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error(f"创建账户请求出错: {str(e)}")
            return None
        except ValueError as e:
            logger.error(f"解析账户响应出错: {str(e)}")
            return None

    def get_token(self, acount):
        try:
            json_data = {
                "address": acount,
                "password": "thisispassword",
            }

            response = requests.post(
                "https://api.mail.tm/token", json=json_data, timeout=10
            )

            if response.status_code != 200:
                logger.error(f"获取令牌失败: 状态码 {response.status_code}")
                if response.text:
                    logger.info("get_token:" + response.text)
                return None

            if not response.text:
                logger.error("获取令牌失败: 空响应")
                return None

            logger.info("get_token:" + response.text)
            response_data = response.json()

            if "token" not in response_data:
                logger.error("获取令牌失败: 返回数据中无token字段")
                return None

            self.token = response_data["token"]
            return self.token
        except requests.exceptions.RequestException as e:
            logger.error(f"获取令牌请求出错: {str(e)}")
            return None
        except ValueError as e:
            logger.error(f"解析令牌响应出错: {str(e)}")
            return None

    def getmessage(self):
        try:
            # 先检查token是否存在
            if not hasattr(self, "token") or self.token is None:
                logger.error("获取消息失败: token未初始化")
                return None

            headers = {
                "accept": "*/*",
                "accept-language": "zh-CN,zh;q=0.9",
                "authorization": "Bearer " + self.token,
                "cache-control": "no-cache",
                "origin": "https://mail.tm",
                "pragma": "no-cache",
                "priority": "u=1, i",
                "referer": "https://mail.tm/",
                "sec-ch-ua": '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
                "sec-ch-ua-mobile": "?0",
                "sec-ch-ua-platform": '"Windows"',
                "sec-fetch-dest": "empty",
                "sec-fetch-mode": "cors",
                "sec-fetch-site": "same-site",
                "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
            }
            response = requests.get(
                "https://api.mail.tm/messages", headers=headers, timeout=10
            )

            if response.status_code != 200:
                logger.error(f"获取消息失败: 状态码 {response.status_code}")
                return None

            response_data = response.json()

            if (
                "hydra:totalItems" in response_data
                and response_data["hydra:totalItems"] > 0
            ):
                return response_data["hydra:member"][0]["intro"]
            else:
                return None
        except Exception as e:
            logger.error(f"获取消息时出错: {str(e)}")
            return None

    def wait_getmessage(self, max_wait_time=60):  # 缩短默认等待时间为60秒
        # 先检查token是否初始化
        if not hasattr(self, "token") or self.token is None:
            logger.error("等待消息失败: token未初始化")
            return None

        start_time = time.time()
        check_count = 0

        while True:
            try:
                message = self.getmessage()
                if message is not None:
                    return message

                # 检查是否超时
                if time.time() - start_time > max_wait_time:
                    logger.error(f"等待消息超时,已等待{max_wait_time}秒")
                    return None

                check_count += 1
                if check_count > 15:  # 如果检查超过15次仍未收到邮件,认为邮箱有问题
                    logger.warning(f"多次检查未收到邮件,邮箱可能有问题: {self.acount}")
                    return None

                logger.info(f"等待邮件中... ({check_count}/15)")
                time.sleep(2)  # 增加等待时间,减轻请求频率
            except Exception as e:
                logger.error(f"等待消息时出错: {str(e)}")
                # 如果是token相关错误,直接返回None
                if "token" in str(e).lower():
                    logger.error("token错误,放弃当前邮箱")
                    return None
                time.sleep(2)


class RelingoReg:
    def __init__(self):
        self.mm = MailTmClient()
        self.email = self.mm.get_email()

        ua = UserAgent(platforms="desktop")

        self.headers = {
            "authority": "api.relingo.net",
            "accept": "*/*",
            "content-type": "application/json",
            "origin": "chrome-extension://dpphkcfmnbkdpmgneljgdhfnccnhmfig",
            "sec-fetch-dest": "empty",
            "sec-fetch-mode": "cors",
            "sec-fetch-site": "none",
            "user-agent": ua.random,
            "x-relingo-dest-lang": "en",
            "x-relingo-lang": "zh",
            "x-relingo-platform": "extension",
            "x-relingo-referrer": "https://relingo.net/en/try?relingo-drawer=account",
            "x-relingo-version": "3.16.6",
        }
        self.cookies = {"referrer": referrer}

    def sendcode(self):
        try:
            json_data = {
                "email": self.email,
                "type": "LOGIN",
            }

            response = requests.post(
                "https://api.relingo.net/api/sendPasscode",
                cookies=self.cookies,
                headers=self.headers,
                json=json_data,
                timeout=10,
            )

            if response.status_code != 200:
                logger.error(f"发送验证码失败: 状态码 {response.status_code}")
                return False

            logger.info("Relingo_SendCode:" + response.text)
            return True
        except Exception as e:
            logger.error(f"发送验证码时出错: {str(e)}")
            return False

    def reg(self, code):
        try:
            json_data = {
                "type": "PASSCODE",
                "email": self.email,
                "code": code,
                "referrer": referrer,
            }

            response = requests.post(
                "https://api.relingo.net/api/login",
                cookies=self.cookies,
                headers=self.headers,
                json=json_data,
                timeout=10,
            )

            if response.status_code != 200:
                logger.error(f"注册失败: 状态码 {response.status_code}")
                return False

            logger.info("Relingo_Reg:" + response.text)
            return True
        except Exception as e:
            logger.error(f"注册时出错: {str(e)}")
            return False

    def start(self):
        try:
            if not self.sendcode():
                logger.error("发送验证码失败")
                return False

            # 等待并获取验证码
            message = self.mm.wait_getmessage(180)  # 最多等待3分钟

            if not message:
                logger.error("未收到验证码邮件")
                return False

            logger.info("Get message:" + message)
            result = re.search(r"\d+", message)

            if result:
                code = result.group(0)
                return self.reg(code)
            else:
                logger.error("无法从邮件中提取验证码")
                return False
        except Exception as e:
            logger.error(f"注册出错: {str(e)}")
            return False


def register_task(task_id, success_counter):
    try:
        logger.info(f"任务 {task_id} 开始注册")

        # 添加重试机制
        for attempt in range(5):  # 增加到5次尝试
            try:
                # 创建RelingoReg对象,可能会失败
                try:
                    relingoreg = RelingoReg()
                except Exception as e:
                    logger.error(f"创建RelingoReg对象失败: {str(e)}")
                    # 如果是最后一次尝试,则抛出异常,否则重试
                    if attempt >= 4:
                        raise
                    continue

                # 开始注册过程
                result = relingoreg.start()

                if result:
                    with success_counter_lock:
                        success_counter[0] += 1
                    logger.success(
                        f"任务 {task_id} 注册成功! 总成功次数: {success_counter[0]}"
                    )
                    return True
                else:
                    logger.warning(f"任务 {task_id} 注册失败 (尝试 {attempt + 1}/5)")
                    # 不等待,直接尝试新邮箱
                    logger.info(f"任务 {task_id} 跳过当前邮箱,尝试新邮箱")
            except Exception as e:
                logger.error(
                    f"任务 {task_id} 发生错误 (尝试 {attempt + 1}/5): {str(e)}"
                )
                if attempt < 4:  # 如果不是最后一次尝试,则继续
                    time.sleep(1)

        logger.error(f"任务 {task_id} 失败,已重试5次")
        return False
    except Exception as e:
        logger.error(f"任务 {task_id} 发生严重错误: {str(e)}")
        return False


if __name__ == "__main__":
    success_counter = [0]  # 使用列表存储计数器,方便在线程间共享
    success_counter_lock = threading.Lock()  # 线程锁,防止计数冲突

    fail_counter = [0]  # 失败计数器
    fail_counter_lock = threading.Lock()  # 线程锁

    # 配置日志
    logger.remove()
    logger.add(
        "relingo_reg_{time}.log",
        rotation="100 MB",
        level="INFO",
        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level}</level> | <cyan>{message}</cyan>",
    )
    logger.add(
        lambda msg: print(msg, end=""),
        level="INFO",
        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level}</level> | <cyan>{message}</cyan>",
    )

    logger.info("Relingo自动注册程序开始运行...")
    logger.info("按 Ctrl+C 停止程序")

    task_id = 0
    max_workers = 3  # 减少并发数,避免API限制

    # 添加任务间延迟,避免同时发起大量请求
    task_delay = 1  # 减少任务延迟到1秒

    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            while True:
                # 提交任务到线程池
                futures = []
                for i in range(max_workers):
                    task_id += 1
                    # 添加任务间延迟
                    time.sleep(task_delay)
                    futures.append(
                        executor.submit(register_task, task_id, success_counter)
                    )

                # 等待所有任务完成
                complete_count = 0
                for future in concurrent.futures.as_completed(futures):
                    result = future.result()
                    if not result:
                        with fail_counter_lock:
                            fail_counter[0] += 1
                    complete_count += 1
                    # 显示进度
                    logger.info(f"当前批次完成: {complete_count}/{len(futures)}")

                logger.info(
                    f"当前总成功次数: {success_counter[0]} | 失败次数: {fail_counter[0]}"
                )
                time.sleep(3)  # 批次间间隔时间缩短到3秒
    except KeyboardInterrupt:
        logger.info("程序被用户中断")
        logger.info(
            f"总共成功注册 {success_counter[0]} 个账号 | 失败 {fail_counter[0]} 次"
        )
    except Exception as e:
        logger.error(f"程序发生错误: {str(e)}")
    finally:
        logger.info(
            f"程序结束,总成功次数: {success_counter[0]} | 失败次数: {fail_counter[0]}"
        )
爱生活,爱奶昔~
发表于 前天 15:27 | 查看全部
Relingo_SendCode:{"code":2219,"message":"Email cannot be send"}
爱生活,爱奶昔~
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

  • 关注公众号
  • 添加微信客服
© 2025 Naixi Networks 沪ICP备13020230号-1|沪公网安备 31010702007642号
关灯 在本版发帖
扫一扫添加微信客服
返回顶部
快速回复 返回顶部 返回列表