123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- from typing import Optional
- from datetime import datetime
- import mysql.connector
- from mysql.connector import pooling
- from server.module.TweetModel import TweetModel
- from server.module.UserModel import User
- # 数据库连接配置
- # MySQL 连接配置
- DB_CONFIG = {
- 'host': '117.78.31.244',
- 'user': 'root',
- 'password': 'zh123456',
- 'database': 'twitter_spider',
- 'charset':'utf8mb4'
- }
- # 假设 tweet.created_at 的格式是 'Thu Feb 20 00:38:20 +0000 2025'
- # 你可以使用 datetime 模块来解析并转换时间格式
- def convert_to_mysql_datetime(date_str: str) -> str:
- try:
- # 解析时间字符串,并转换为 datetime 对象
- tweet_datetime = datetime.strptime(date_str, '%a %b %d %H:%M:%S +0000 %Y')
- # 将 datetime 对象转换为 MySQL 格式的字符串
- return tweet_datetime.strftime('%Y-%m-%d %H:%M:%S')
- except ValueError as e:
- print(f"Error converting time: {e}")
- return None # 或者返回一个默认值
- class DatabaseHandler:
- def __init__(self):
- # 初始化连接池
- self.db_config = DB_CONFIG
- self.pool = pooling.MySQLConnectionPool(
- pool_name="mypool",
- pool_size=5, # 设置连接池大小
- **DB_CONFIG
- )
- def get_connection(self):
- # 从连接池中获取连接
- return self.pool.get_connection()
- async def save_tweet(self, tweet, tweet_type, latest_cursor):
- conn = self.get_connection()
- cursor = conn.cursor()
- # 插入推文数据
- query = """
- INSERT INTO tweets (id, created_at, user_id, text, lang, in_reply_to,
- is_quote_status, quote_id, retweeted_tweet_id, possibly_sensitive, quote_count,
- reply_count, favorite_count, favorited, view_count, retweet_count, bookmark_count,
- bookmarked, place, is_translatable, is_edit_eligible, edits_remaining,tweet_type,next_cursor)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s)
- """
- params = (
- tweet.id,
- convert_to_mysql_datetime(tweet.created_at),
- tweet.user.id,
- tweet.text,
- tweet.lang,
- tweet.in_reply_to,
- tweet.is_quote_status,
- tweet.quote.id if tweet.quote else None,
- tweet.retweeted_tweet.id if tweet.retweeted_tweet else None,
- tweet.possibly_sensitive,
- tweet.quote_count,
- tweet.reply_count,
- tweet.favorite_count,
- tweet.favorited,
- tweet.view_count,
- tweet.retweet_count,
- tweet.bookmark_count,
- tweet.bookmarked,
- tweet.place if tweet.place else None, # Handle None values
- tweet.is_translatable,
- tweet.is_edit_eligible,
- tweet.edits_remaining,
- tweet_type,
- latest_cursor
- )
- try:
- cursor.execute(query, params)
- conn.commit()
- except mysql.connector.Error as err:
- print(f"Error inserting tweet: {err}")
- conn.rollback() # 如果插入失败,回滚事务
- finally:
- cursor.close()
- conn.close()
- # 插入媒体数据
- for media in tweet.media:
- self.save_media(tweet.id, media)
- # 插入hashtags
- for hashtag in tweet.hashtags:
- self.save_hashtag(tweet.id, hashtag)
- # 插入URLs
- for url_obj in tweet.urls:
- # 提取 expanded_url(完整的 URL)
- url = url_obj.get('expanded_url') # 如果你希望保存完整的 URL
- if url:
- self.save_url(tweet.id, url) # 执行 SQL 插入操作
- # 插入回复
- if tweet.replies:
- for reply in tweet.replies:
- self.save_reply(tweet.id, reply.id)
- # 插入相关推文(引用或转发)
- if tweet.related_tweets:
- for related in tweet.related_tweets:
- self.save_related_tweet(tweet.id, related.id)
- def save_media(self, tweet_id, media):
- conn = self.get_connection()
- cursor = conn.cursor()
- query = """
- INSERT INTO media (tweet_id, media_url, media_type)
- VALUES (%s, %s, %s)
- """
- try:
- cursor.execute(query, (tweet_id, media.url, media.type))
- conn.commit()
- except mysql.connector.Error as err:
- print(f"Error inserting media: {err}")
- conn.rollback() # 如果插入失败,回滚事务
- finally:
- cursor.close()
- conn.close()
- def save_hashtag(self, tweet_id, hashtag):
- conn = self.get_connection()
- cursor = conn.cursor()
- query = """
- INSERT INTO hashtags (tweet_id, hashtag)
- VALUES (%s, %s)
- """
- try:
- cursor.execute(query, (tweet_id, hashtag))
- conn.commit()
- except mysql.connector.Error as err:
- print(f"Error inserting hashtag: {err}")
- conn.rollback()
- finally:
- cursor.close()
- conn.close()
- def save_url(self, tweet_id, url):
- conn = self.get_connection()
- cursor = conn.cursor()
- query = """
- INSERT INTO urls (tweet_id, url)
- VALUES (%s, %s)
- """
- try:
- cursor.execute(query, (tweet_id, url))
- conn.commit()
- except mysql.connector.Error as err:
- print(f"Error inserting URL: {err}")
- conn.rollback()
- finally:
- cursor.close()
- conn.close()
- def save_reply(self, tweet_id, reply_tweet_id):
- conn = self.get_connection()
- cursor = conn.cursor()
- query = """
- INSERT INTO replies (tweet_id, reply_tweet_id)
- VALUES (%s, %s)
- """
- try:
- cursor.execute(query, (tweet_id, reply_tweet_id))
- conn.commit()
- except mysql.connector.Error as err:
- print(f"Error inserting reply: {err}")
- conn.rollback()
- finally:
- cursor.close()
- conn.close()
- def save_related_tweet(self, tweet_id, related_tweet_id):
- conn = self.get_connection()
- cursor = conn.cursor()
- query = """
- INSERT INTO related_tweets (tweet_id, related_tweet_id)
- VALUES (%s, %s)
- """
- try:
- cursor.execute(query, (tweet_id, related_tweet_id))
- conn.commit()
- except mysql.connector.Error as err:
- print(f"Error inserting reply: {err}")
- conn.rollback()
- finally:
- cursor.close()
- conn.close()
- def get_latest_twitter_id(self, user_id: str, tweet_type: Optional[str] = None) -> Optional[str]:
- """获取数据库中某个用户指定类型的最新 twitter_id"""
- conn = self.get_connection()
- cursor = conn.cursor()
- # 构建查询条件
- query = "SELECT next_cursor FROM tweets WHERE user_id = %s"
- params = [user_id]
- if tweet_type:
- query += " AND tweet_type = %s"
- params.append(tweet_type)
- query += " ORDER BY created_at DESC LIMIT 1"
- try:
- cursor.execute(query, tuple(params))
- result = cursor.fetchone()
- return result[0] if result else None
- except mysql.connector.Error as err:
- print(f"Error inserting reply: {err}")
- conn.rollback()
- finally:
- cursor.close()
- conn.close()
- async def save_user(self, user_data):
- conn = self.get_connection()
- cursor = conn.cursor()
- print(user_data)
- # 检查用户是否已存在
- check_query = "SELECT COUNT(*) FROM users WHERE id = %s"
- cursor.execute(check_query, (user_data['id'],))
- result = cursor.fetchone()
- if result[0] > 0: # 用户已存在
- # 更新用户数据
- update_query = """
- UPDATE users SET
- name = %s,
- screen_name = %s,
- profile_image_url = %s,
- profile_banner_url = %s,
- url = %s,
- location = %s,
- description = %s,
- is_blue_verified = %s,
- verified = %s,
- possibly_sensitive = %s,
- can_dm = %s,
- can_media_tag = %s,
- want_retweets = %s,
- default_profile = %s,
- default_profile_image = %s,
- followers_count = %s,
- fast_followers_count = %s,
- normal_followers_count = %s,
- following_count = %s,
- favourites_count = %s,
- listed_count = %s,
- media_count = %s,
- statuses_count = %s,
- is_translator = %s,
- translator_type = %s,
- profile_interstitial_type = %s,
- withheld_in_countries = %s
- WHERE id = %s
- """
- cursor.execute(update_query, (
- user_data['name'], user_data['screen_name'], user_data['profile_image_url'],
- user_data['profile_banner_url'], user_data['url'], user_data['location'],
- user_data['description'], user_data['is_blue_verified'], user_data['verified'],
- user_data['possibly_sensitive'], user_data['can_dm'], user_data['can_media_tag'],
- user_data['want_retweets'], user_data['default_profile'], user_data['default_profile_image'],
- user_data['followers_count'], user_data['fast_followers_count'], user_data['normal_followers_count'],
- user_data['following_count'], user_data['favourites_count'], user_data['listed_count'],
- user_data['media_count'], user_data['statuses_count'], user_data['is_translator'],
- user_data['translator_type'], user_data['profile_interstitial_type'],
- user_data['withheld_in_countries'], user_data['id']
- ))
- else: # 用户不存在,执行插入操作
- insert_query = """
- INSERT INTO users (
- id, name, screen_name, profile_image_url, profile_banner_url, url,
- location, description, is_blue_verified, verified, possibly_sensitive,
- can_dm, can_media_tag, want_retweets, default_profile, default_profile_image,
- followers_count, fast_followers_count, normal_followers_count, following_count,
- favourites_count, listed_count, media_count, statuses_count, is_translator,
- translator_type, profile_interstitial_type, withheld_in_countries
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- cursor.execute(insert_query, (
- user_data['id'], user_data['name'], user_data['screen_name'], user_data['profile_image_url'],
- user_data['profile_banner_url'], user_data['url'], user_data['location'], user_data['description'],
- user_data['is_blue_verified'], user_data['verified'], user_data['possibly_sensitive'],
- user_data['can_dm'],
- user_data['can_media_tag'], user_data['want_retweets'], user_data['default_profile'],
- user_data['default_profile_image'], user_data['followers_count'], user_data['fast_followers_count'],
- user_data['normal_followers_count'], user_data['following_count'], user_data['favourites_count'],
- user_data['listed_count'], user_data['media_count'], user_data['statuses_count'],
- user_data['is_translator'],
- user_data['translator_type'], user_data['profile_interstitial_type'], user_data['withheld_in_countries']
- ))
- try:
- conn.commit()
- except mysql.connector.Error as err:
- print(f"Error inserting user: {err}")
- conn.rollback()
- finally:
- cursor.close()
- conn.close()
- async def get_all_user(self, page: int = 1, page_size: int = 10):
- # 连接到数据库
- conn = self.get_connection()
- cursor = conn.cursor()
- # 计算偏移量
- offset = (page - 1) * page_size
- # 执行查询获取所有用户的 id
- query = "SELECT * FROM users LIMIT %s OFFSET %s"
- cursor.execute(query, (page_size, offset))
- # 获取所有结果,提取 id 列
- user_list = cursor.fetchall()
- # 获取总记录数
- count_query = "SELECT COUNT(*) FROM users"
- cursor.execute(count_query)
- total_records = cursor.fetchone()[0]
- # 关闭游标和连接
- cursor.close()
- conn.close()
- users = [
- User(
- id=row[0],
- name=row[1],
- screen_name=row[2],
- profile_image_url=row[3],
- profile_banner_url=row[4],
- url=row[5],
- location=row[6],
- description=row[7],
- is_blue_verified=row[8],
- verified=row[9],
- possibly_sensitive=row[10],
- can_dm=row[11],
- can_media_tag=row[12],
- want_retweets=row[13],
- default_profile=row[14],
- default_profile_image=row[15],
- followers_count=row[16],
- fast_followers_count=row[17],
- normal_followers_count=row[18],
- following_count=row[19],
- favourites_count=row[20],
- listed_count=row[21],
- media_count=row[22],
- statuses_count=row[23],
- is_translator=row[24],
- translator_type=row[25],
- profile_interstitial_type=row[26],
- withheld_in_countries=row[27]
- ) for row in user_list
- ]
- # 返回所有用户 id 的列表
- return users, total_records
- async def get_all_twitter(self, page: int = 1, page_size: int = 10):
- """
- 获取所有 tweets 并支持分页。
- :param page: 当前页码,默认为 1
- :param page_size: 每页显示的记录数,默认为 10
- :return: tweets 列表
- """
- # 计算偏移量
- offset = (page - 1) * page_size
- # 连接到数据库
- conn = self.get_connection()
- cursor = conn.cursor()
- # 执行分页查询获取 tweets
- query = "SELECT * FROM tweets LIMIT %s OFFSET %s"
- cursor.execute(query, (page_size, offset))
- # 获取所有结果
- tweets_list = cursor.fetchall()
- # 返回查询结果
- tweets = [TweetModel(**tweet) for tweet in tweets_list]
- # 获取总记录数
- count_query = "SELECT COUNT(*) FROM tweets"
- cursor.execute(count_query)
- total_records = cursor.fetchone()[0]
- # 关闭游标和连接
- cursor.close()
- conn.close()
- # 返回所有 tweets 的列表
- return tweets, total_records
- async def get_all_user_ids(self):
- # 连接到数据库
- conn = mysql.connector.connect(**self.db_config)
- cursor = conn.cursor()
- # 执行查询获取所有用户的 id
- query = "SELECT id FROM users"
- cursor.execute(query)
- # 获取所有结果,提取 id 列
- user_ids = [row[0] for row in cursor.fetchall()]
- # 关闭游标和连接
- cursor.close()
- conn.close()
- # 返回所有用户 id 的列表
- return user_ids
|