【AGC】云存储服务端使用方法

发布时间 2023-11-23 11:36:39作者: Mayism123

【集成准备】

1、Python环境配置

下载Python和PyCharm并安装。

cke_125677.png​​

使用安装的python本身作为解释器。

cke_134497.png​​

安装AGC Python SDK。

cke_139799.png​​云存储包安装完成。

2、AGC环境配置

在AGC创建项目和应用

cke_148779.png​​

开通云存储服务。

返回项目设置界面,选择Server SDK 页签,在认证凭据处点击创建按钮,然后下载认证凭据。

cke_154152.png​​

将认证凭据导入到项目中

 

【布局设计】

本次测试的Demo是一个Python服务,所以没有界面UI。

 

【功能实现】

引入AGC与云存储模块

from agconnect.cloud_storage import AGCCloudStorageException

from agconnect.cloud_storage import GetFilesOptions

from agconnect.cloud_storage import Metadata

from agconnect.cloud_storage import StorageManagement

from agconnect.common_server import AGCClient, CredentialParser, logger

将下载的凭据文件放入项目中,调用AGCClient.initialize方法初始化AGCClient实例

将配置开发环境中获取的认证凭据放置到自定义的目录,通过initialize方法初始化对应认证凭据的AGCClient实例。

通过StorageManagement()来初始化存储实例。

bucket_name = 'cloudstoragepython-kyuv2'

file_name = 'test.txt' # for example: "test.txt"

config_path = os.path.normpath(os.path.join(os.path.dirname(__file__), 'agc-apiclient-1157238089186298880-7233693580609187129.json'))

credential = CredentialParser.to_credential(config_path)

AGCClient.initialize(credential=credential)

storage = StorageManagement()

bucket = storage.bucket(bucket_name)

上传文件

使用bucket.upload方法将文件上传到云端。

def upload_file(bucket):

config_path = os.path.normpath(os.path.join(os.path.dirname(__file__), 'C:\Users\kwx1075489\Desktop\agcserversdk-python-1.3.0.300\test.txt'))

if not os.path.exists(config_path):

logger.error("file does not exist")

loop = asyncio.new_event_loop()

try:

asyncio.set_event_loop(loop)

result = loop.run_until_complete(bucket.upload(path_str=config_path))

logger.info(f"Upload file response: {result}")

except (Exception, AGCCloudStorageException) as e:

raise AssertionError(f"An error occurred during the upload file process: {e}")

finally:

loop.close()

调用file.get_metadata方法获取设置在云端的元数据

def get_file_metadata(bucket, file_name):

file = bucket.file(file_name)

loop = asyncio.new_event_loop()

try:

asyncio.set_event_loop(loop)

result = loop.run_until_complete(file.get_metadata())

res_json = loop.run_until_complete(result.json(content_type='text/plain'))

logger.info(f"Get file metadata: {res_json}")

except (Exception, AGCCloudStorageException) as e:

raise AssertionError(f"An error occurred during the get file metadata: {e}")

finally:

loop.close()

调用file.set_metadata方法将文件属性的元数据和自定义的元数据覆盖到云端。

def update_file_metadata(bucket, file_name):

file = bucket.file(file_name)

loop = asyncio.new_event_loop()

try:

asyncio.set_event_loop(loop)

get_result = loop.run_until_complete(file.get_metadata())

get_result_text = loop.run_until_complete(get_result.text())

get_result_json = json.loads(get_result_text)

metadata = Metadata(content_language='en-US', custom_metadata={'test': 'test'},

content_type=get_result_json.get('contentType'))

set_result = loop.run_until_complete(file.set_metadata(metadata))

set_result_json = loop.run_until_complete(set_result.json(content_type='text/plain'))

logger.info(f"Update file metadata response: {set_result_json}")

except (Exception, AGCCloudStorageException) as e:

raise AssertionError(f"An error occurred during the update file metadata: {e}")

finally:

loop.close()

调用file.download方法将云端文件数据写入本地文件中。

def download_file(bucket, file_name):

def progress_callback(progress: Dict[str, int] = None):

logger.info(f"Downloaded {progress['writtenBytes']} bytes out of {progress['totalBytes']}")

loop = asyncio.new_event_loop()

asyncio.set_event_loop(loop)

try:

local_file = os.path.normpath(

os.path.join(os.path.dirname(__file__), ' C:\Users\kwx1075489\Desktop'))

if not os.path.exists(local_file):

logger.error("file does not exist")

remote_file = bucket.file(file_name)

text, resp = loop.run_until_complete(remote_file.download(local_file, on_download_progress=progress_callback))

logger.info(f"Download file response: {resp}")

assert text == "File downloaded successfully. "

except (Exception, AGCCloudStorageException) as e:

raise AssertionError(f"An error occurred during the download file process: {e}")

finally:

loop.close()

调用file.download方法获取当前目录下的文件与子目录。

def get_files(bucket):

loop = asyncio.new_event_loop()

asyncio.set_event_loop(loop)

options = GetFilesOptions(delimiter="/")

try:

result = loop.run_until_complete(bucket.get_files(options=options))

logger.info(f"Get file list response: {result}")

except (Exception, AGCCloudStorageException) as e:

raise AssertionError(f"An error occurred during the get file list: {e}")

finally:

loop.close()

调用file.delete方法删除云端文件。

def get_files(bucket):

loop = asyncio.new_event_loop()

asyncio.set_event_loop(loop)

options = GetFilesOptions(delimiter="/")

try:

result = loop.run_until_complete(bucket.get_files(options=options))

logger.info(f"Get file list response: {result}")

except (Exception, AGCCloudStorageException) as e:

raise AssertionError(f"An error occurred during the get file list: {e}")

finally:

loop.close()

 

【功能测试】

执行python main.py命令,服务依次执行:

上传“test.txt”文件到云端:

cke_165928.png​​

从云端下载“test.txt”文件到本地目录:

从云端删除“cloudstoragepython-kyuv2”存储区的“test.txt”文件:

好的,今天就到这里了。