flutter pub上传包的网络交互流程
1. 获取上传地址
请求 api/packages/versions/new 地址,
参数:

获取第二步包上传的需要的目标接口地址和参数
返回 https://storage.googleapis.com
2. 文件上传
请求第一步返回的地址:https://storage.googleapis.com

获取第三步要到的接口地址和参数
3. 完成上传
请求第二步返回的地址:https://pub.dev/api/packages/versions/newUploadFinish
上传失败的情况:
- 是别人上传过的组件,没有权限
- 有权限,像同样的版本号已经存在了,需要修改新版本号
- 其他

下面我们看一下pub_dev源代码。
主要看一下这一步对应后端的newUploadFinish API的实现
pub_dev项目地址:
https://github.com/dart-lang/pub-dev
/// Finishes the upload of a package.
Future<PackageVersion> publishUploadedBlob(String guid) async {
final restriction = await getUploadRestrictionStatus();
if (restriction == UploadRestrictionStatus.noUploads) {
throw PackageRejectedException.uploadRestricted();
}
final agent = await requireAuthenticatedClient();
_logger.info('Finishing async upload (uuid: $guid)');
_logger.info('Reading tarball from cloud storage.');
return await withTempDirectory((Directory dir) async {
final filename = '${dir.absolute.path}/tarball.tar.gz';
final info = await _incomingBucket.tryInfo(tmpObjectName(guid));
if (info?.length == null) {
throw PackageRejectedException.archiveEmpty();
}
if (info!.length > UploadSignerService.maxUploadSize) {
throw PackageRejectedException.archiveTooLarge(
UploadSignerService.maxUploadSize);
}
await _saveTarballToFS(
_incomingBucket.read(tmpObjectName(guid)), filename);
_logger.info('Examining tarball content ($guid).');
final sw = Stopwatch()..start();
final file = File(filename);
final fileBytes = await file.readAsBytes();
final sha256Hash = sha256.convert(fileBytes).bytes;
final archive = await summarizePackageArchive(
filename,
maxContentLength: maxAssetContentLength,
maxArchiveSize: UploadSignerService.maxUploadSize,
published: clock.now().toUtc(),
);
_logger.info('Package archive scanned in ${sw.elapsed}.');
if (archive.hasIssues) {
throw PackageRejectedException(archive.issues.first.message);
}
final pubspec = Pubspec.fromYaml(archive.pubspecContent!);
await _verifyPackageName(
name: pubspec.name,
agent: agent,
);
// Check if new packages are allowed to be uploaded.
if (restriction == UploadRestrictionStatus.onlyUpdates &&
!(await isPackageVisible(pubspec.name))) {
throw PackageRejectedException.uploadRestricted();
}
// Check version format.
final versionString = canonicalizeVersion(pubspec.nonCanonicalVersion);
if (versionString == null) {
throw InvalidInputException.canonicalizeVersionError(
pubspec.nonCanonicalVersion);
}
// TODO: check this in pkg/pub_package_reader too
if (versionString != pubspec.nonCanonicalVersion) {
throw InvalidInputException.nonCanonicalVersion(
pubspec.nonCanonicalVersion, versionString);
}
// Check canonical archive.
final canonicalArchivePath =
tarballObjectName(pubspec.name, versionString);
final canonicalArchiveInfo =
await _canonicalBucket.tryInfo(canonicalArchivePath);
if (canonicalArchiveInfo != null) {
// Actually fetch the archive bytes and do full comparison.
final objectBytes =
await _canonicalBucket.readAsBytes(canonicalArchivePath);
if (!fileBytes.byteToByteEquals(objectBytes)) {
throw PackageRejectedException.versionExists(
pubspec.name, versionString);
}
}
// check existences of referenced packages
final dependencies = <String>{
...pubspec.dependencies.keys,
};
for (final name in dependencies) {
if (isSoftRemoved(name)) {
continue;
}
if (nameTracker.hasPackage(name)) {
continue;
}
// Note: When the name tracker has not yet updated its in-memory cache
// with recent packages, this check would cause a datastore lookup.
if (await isPackageVisible(name)) {
continue;
}
throw PackageRejectedException.dependencyDoesNotExists(name);
}
sw.reset();
final entities = await _createUploadEntities(db, agent, archive,
sha256Hash: sha256Hash);
final version = await _performTarballUpload(
entities: entities,
agent: agent,
archive: archive,
guid: guid,
hasCanonicalArchiveObject: canonicalArchiveInfo != null,
);
_logger.info('Tarball uploaded in ${sw.elapsed}.');
_logger.info('Removing temporary object $guid.');
sw.reset();
await _incomingBucket.delete(tmpObjectName(guid));
_logger.info('Temporary object removed in ${sw.elapsed}.');
return version;
});
}
流程如下:
- 首先,获取上传限制状态,如果上传被禁止,抛出一个 PackageRejectedException 异常。
- 然后,获取经过身份验证的客户端。
- 然后,从云存储中读取上传的 tarball。
- 接下来,在临时目录中保存 tarball,并扫描它的内容。如果 tarball 存在问题,就抛出一个 PackageRejectedException 异常。
接着,从 tarball 中提取出 package 的信息,包括 pubspec 和它的依赖。并对 package 进行各种检查,包括包名的合法性,版本格式,canonical archive 是否已经存在等。 - 如果 package 中的依赖在系统中不存在,就抛出一个 PackageRejectedException 异常。
- 如果检查通过,就在数据库中创建一个上传实体,并上传 tarball 到云存储中。+ 如果 canonical archive 对象已经存在,就比较上传的 tarball 和已存在的对象,+ 如果不同就抛出一个 PackageRejectedException 异常。
- 最后,删除临时目录中的文件,并返回上传的 package 的版本信息。
其中, 验证上传者身份的代码如下:
_performTarballUpload -> _requireUploadAuthorization
/// Throws a [ResponseException] if [agent] is **not** authorized to upload package.
///
/// If [package] is null, this is an attempt to publish a new package, not a new version to an existing package.
/// If [package] is not null, this is an attempt to publish [newVersion] of existing package.
Future<void> _requireUploadAuthorization(
AuthenticatedAgent agent, Package? package, String newVersion) async {
// new package
if (package == null) {
if (agent is AuthenticatedUser) {
return;
}
throw PackageRejectedException.onlyUsersAreAllowedToUploadNewPackages();
}
// existing package
if (package.isNotVisible) {
throw PackageRejectedException.isBlocked();
}
if (agent is AuthenticatedUser &&
await packageBackend.isPackageAdmin(package, agent.user.userId)) {
return;
}
if (agent is AuthenticatedGithubAction) {
await _checkGithubActionAllowed(agent, package, newVersion);
return;
}
if (agent is AuthenticatedGcpServiceAccount) {
await _checkServiceAccountAllowed(agent, package, newVersion);
return;
}
_logger.info('User ${agent.agentId} (${agent.displayId}) '
'is not an uploader for package ${package.name}, rolling transaction back.');
throw AuthorizationException.userCannotUploadNewVersion(
agent.displayId, package.name!);
}
网友评论