Refactor TSWriter

This commit is contained in:
shogo4405 2018-11-25 02:19:52 +09:00
parent d8e773bf2c
commit c4fa7e5d07
6 changed files with 207 additions and 216 deletions

View File

@ -2,7 +2,7 @@ import AVFoundation
protocol AudioEncoderDelegate: class {
func didSetFormatDescription(audio formatDescription: CMFormatDescription?)
func sampleOutput(audio bytes: UnsafeMutablePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime)
func sampleOutput(audio bytes: UnsafePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime)
}
// MARK: -

View File

@ -2,7 +2,7 @@ import AVFoundation
open class HTTPStream: NetStream {
private(set) var name: String?
private lazy var tsWriter: TSWriter = TSWriter()
private lazy var tsWriter: TSFileWriter = TSFileWriter()
open func publish(_ name: String?) {
lockQueue.async {

View File

@ -8,6 +8,10 @@ struct AVCFormatStream {
self.data = data
}
init?(bytes: UnsafePointer<UInt8>, count: UInt32) {
self.init(data: Data(bytes: bytes, count: Int(count)))
}
init?(data: Data?) {
guard let data = data else {
return nil

View File

@ -130,12 +130,12 @@ struct PacketizedElementaryStream: PESPacketHeader {
static let untilPacketLengthSize: Int = 6
static let startCode: Data = Data([0x00, 0x00, 0x01])
static func create(_ sampleBuffer: CMSampleBuffer, timestamp: CMTime, config: Any?) -> PacketizedElementaryStream? {
static func create(_ bytes: UnsafePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime, decodeTimeStamp: CMTime, timestamp: CMTime, config: Any?, randomAccessIndicator: Bool) -> PacketizedElementaryStream? {
if let config: AudioSpecificConfig = config as? AudioSpecificConfig {
return PacketizedElementaryStream(sampleBuffer: sampleBuffer, timestamp: timestamp, config: config)
return PacketizedElementaryStream(bytes: bytes, count: count, presentationTimeStamp: presentationTimeStamp, decodeTimeStamp: decodeTimeStamp, timestamp: timestamp, config: config)
}
if let config: AVCConfigurationRecord = config as? AVCConfigurationRecord {
return PacketizedElementaryStream(sampleBuffer: sampleBuffer, timestamp: timestamp, config: sampleBuffer.dependsOnOthers ? nil : config)
return PacketizedElementaryStream(bytes: bytes, count: count, presentationTimeStamp: presentationTimeStamp, decodeTimeStamp: decodeTimeStamp, timestamp: timestamp, config: randomAccessIndicator ? config : nil)
}
return nil
}
@ -182,7 +182,7 @@ struct PacketizedElementaryStream: PESPacketHeader {
}
}
init?(bytes: UnsafeMutablePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime, timestamp: CMTime, config: AudioSpecificConfig?) {
init?(bytes: UnsafePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime, decodeTimeStamp: CMTime, timestamp: CMTime, config: AudioSpecificConfig?) {
guard let bytes = bytes, let config = config else {
return nil
}
@ -204,28 +204,10 @@ struct PacketizedElementaryStream: PESPacketHeader {
}
}
init?(sampleBuffer: CMSampleBuffer, timestamp: CMTime, config: AudioSpecificConfig?) {
guard let payload: Data = sampleBuffer.dataBuffer?.data else {
init?(bytes: UnsafePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime, decodeTimeStamp: CMTime, timestamp: CMTime, config: AVCConfigurationRecord?) {
guard let bytes = bytes else {
return nil
}
data.append(contentsOf: config!.adts(payload.count))
data.append(payload)
optionalPESHeader = PESOptionalHeader()
optionalPESHeader?.dataAlignmentIndicator = true
optionalPESHeader?.setTimestamp(
timestamp,
presentationTimeStamp: sampleBuffer.presentationTimeStamp,
decodeTimeStamp: sampleBuffer.decodeTimeStamp
)
let length = data.count + optionalPESHeader!.data.count
if length < Int(UInt16.max) {
packetLength = UInt16(length)
} else {
return nil
}
}
init?(sampleBuffer: CMSampleBuffer, timestamp: CMTime, config: AVCConfigurationRecord?) {
if let config: AVCConfigurationRecord = config {
data += [0x00, 0x00, 0x00, 0x01, 0x09, 0x10]
data += [0x00, 0x00, 0x00, 0x01] + config.sequenceParameterSets[0]
@ -233,15 +215,15 @@ struct PacketizedElementaryStream: PESPacketHeader {
} else {
data += [0x00, 0x00, 0x00, 0x01, 0x09, 0x30]
}
if let stream: AVCFormatStream = AVCFormatStream(data: sampleBuffer.dataBuffer?.data) {
if let stream: AVCFormatStream = AVCFormatStream(bytes: bytes, count: count) {
data.append(stream.toByteStream())
}
optionalPESHeader = PESOptionalHeader()
optionalPESHeader?.dataAlignmentIndicator = true
optionalPESHeader?.setTimestamp(
timestamp,
presentationTimeStamp: sampleBuffer.presentationTimeStamp,
decodeTimeStamp: sampleBuffer.decodeTimeStamp
presentationTimeStamp: presentationTimeStamp,
decodeTimeStamp: decodeTimeStamp
)
let length = data.count + optionalPESHeader!.data.count
if length < Int(UInt16.max) {

View File

@ -5,39 +5,14 @@ public protocol TSWriterDelegate: class {
func didOutput(_ data: Data)
}
public class TSWriter {
static let defaultPATPID: UInt16 = 0
static let defaultPMTPID: UInt16 = 4095
static let defaultVideoPID: UInt16 = 256
static let defaultAudioPID: UInt16 = 257
static let defaultSegmentCount: Int = 3
static let defaultSegmentMaxCount: Int = 12
static let defaultSegmentDuration: Double = 2
var playlist: String {
var m3u8: M3U = M3U()
m3u8.targetDuration = segmentDuration
if sequence <= TSWriter.defaultSegmentMaxCount {
m3u8.mediaSequence = 0
m3u8.mediaList = files
for mediaItem in m3u8.mediaList where mediaItem.duration > m3u8.targetDuration {
m3u8.targetDuration = mediaItem.duration + 1
}
return m3u8.description
}
let startIndex = max(0, files.count - TSWriter.defaultSegmentCount)
m3u8.mediaSequence = sequence - TSWriter.defaultSegmentMaxCount
m3u8.mediaList = Array(files[startIndex..<files.count])
for mediaItem in m3u8.mediaList where mediaItem.duration > m3u8.targetDuration {
m3u8.targetDuration = mediaItem.duration + 1
}
return m3u8.description
}
let lockQueue = DispatchQueue(label: "com.haishinkit.HaishinKit.TSWriter.lock")
var segmentMaxCount: Int = TSWriter.defaultSegmentMaxCount
var segmentDuration: Double = TSWriter.defaultSegmentDuration
public class TSWriter: Running {
static public let defaultPATPID: UInt16 = 0
static public let defaultPMTPID: UInt16 = 4095
static public let defaultVideoPID: UInt16 = 256
static public let defaultAudioPID: UInt16 = 257
public weak var delegate: TSWriterDelegate?
public internal(set) var isRunning: Bool = false
private(set) var PAT: ProgramAssociationSpecific = {
let PAT: ProgramAssociationSpecific = ProgramAssociationSpecific()
@ -46,29 +21,19 @@ public class TSWriter {
}()
private(set) var PMT: ProgramMapSpecific = ProgramMapSpecific()
private(set) var files: [M3UMediaInfo] = []
private(set) public var isRunning: Bool = false
private var PCRPID: UInt16 = TSWriter.defaultVideoPID
private var sequence: Int = 0
private var timestamps: [UInt16: CMTime] = [: ]
var PCRPID: UInt16 = TSWriter.defaultVideoPID
private var timestamps: [UInt16: CMTime] = [:]
private var audioConfig: AudioSpecificConfig?
private var videoConfig: AVCConfigurationRecord?
private var PCRTimestamp: CMTime = CMTime.zero
private var currentFileURL: URL?
private var rotatedTimestamp: CMTime = CMTime.zero
private var currentFileHandle: FileHandle?
private var continuityCounters: [UInt16: UInt8] = [: ]
var continuityCounters: [UInt16: UInt8] = [:]
public init() {
}
func getFilePath(_ fileName: String) -> String? {
return files.first {
$0.url.absoluteString.contains(fileName)
}?.url.path
}
final func writeSampleBuffer(_ PID: UInt16, streamID: UInt8, bytes: UnsafePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime, decodeTimeStamp: CMTime, randomAccessIndicator: Bool) {
func writeSampleBuffer(_ PID: UInt16, streamID: UInt8, bytes: UnsafeMutablePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime) {
if timestamps[PID] == nil {
timestamps[PID] = presentationTimeStamp
if PCRPID == PID {
@ -76,66 +41,25 @@ public class TSWriter {
}
}
guard var PES = PacketizedElementaryStream(bytes: bytes, count: count, presentationTimeStamp: presentationTimeStamp, timestamp: timestamps[PID]!, config: audioConfig) else {
guard var PES = PacketizedElementaryStream.create(
bytes,
count: count,
presentationTimeStamp: presentationTimeStamp,
decodeTimeStamp: decodeTimeStamp,
timestamp: timestamps[PID]!,
config: streamID == 192 ? audioConfig : videoConfig,
randomAccessIndicator: randomAccessIndicator) else {
return
}
PES.streamID = streamID
let decodeTimeStamp: CMTime = presentationTimeStamp
var packets: [TSPacket] = split(PID, PES: PES, timestamp: decodeTimeStamp)
_ = rotateFileHandle(decodeTimeStamp)
if streamID == 192 {
packets[0].adaptationField?.randomAccessIndicator = true
}
var bytes: Data = Data()
for var packet in packets {
packet.continuityCounter = continuityCounters[PID]!
continuityCounters[PID] = (continuityCounters[PID]! + 1) & 0x0f
bytes.append(packet.data)
}
nstry({
self.delegate?.didOutput(bytes)
self.currentFileHandle?.write(bytes)
}, { exception in
self.currentFileHandle?.write(bytes)
logger.warn("\(exception)")
})
}
func writeSampleBuffer(_ PID: UInt16, streamID: UInt8, sampleBuffer: CMSampleBuffer) {
let presentationTimeStamp: CMTime = sampleBuffer.presentationTimeStamp
if timestamps[PID] == nil {
timestamps[PID] = presentationTimeStamp
if PCRPID == PID {
PCRTimestamp = presentationTimeStamp
}
}
let config: Any? = streamID == 192 ? audioConfig : videoConfig
guard var PES: PacketizedElementaryStream = PacketizedElementaryStream.create(
sampleBuffer, timestamp: timestamps[PID]!, config: config
) else {
return
}
PES.streamID = streamID
var decodeTimeStamp: CMTime = sampleBuffer.decodeTimeStamp
if decodeTimeStamp == CMTime.invalid {
decodeTimeStamp = presentationTimeStamp
}
var packets: [TSPacket] = split(PID, PES: PES, timestamp: decodeTimeStamp)
_ = rotateFileHandle(decodeTimeStamp)
rotateFileHandle(decodeTimeStamp == CMTime.invalid ? presentationTimeStamp : decodeTimeStamp)
if streamID == 192 {
packets[0].adaptationField?.randomAccessIndicator = true
} else {
packets[0].adaptationField?.randomAccessIndicator = !sampleBuffer.dependsOnOthers
packets[0].adaptationField?.randomAccessIndicator = randomAccessIndicator
}
var bytes: Data = Data()
@ -145,16 +69,43 @@ public class TSWriter {
bytes.append(packet.data)
}
nstry({
self.delegate?.didOutput(bytes)
self.currentFileHandle?.write(bytes)
}, { exception in
self.currentFileHandle?.write(bytes)
logger.warn("\(exception)")
})
write(bytes)
}
func split(_ PID: UInt16, PES: PacketizedElementaryStream, timestamp: CMTime) -> [TSPacket] {
func rotateFileHandle(_ timestamp: CMTime) {
}
func write(_ data: Data) {
delegate?.didOutput(data)
}
func writeProgram() {
PMT.PCRPID = PCRPID
var bytes: Data = Data()
var packets: [TSPacket] = []
packets.append(contentsOf: PAT.arrayOfPackets(TSWriter.defaultPATPID))
packets.append(contentsOf: PMT.arrayOfPackets(TSWriter.defaultPMTPID))
for packet in packets {
bytes.append(packet.data)
}
write(bytes)
}
public func startRunning() {
guard isRunning else {
return
}
isRunning = true
}
public func stopRunning() {
guard !isRunning else {
return
}
isRunning = false
}
private func split(_ PID: UInt16, PES: PacketizedElementaryStream, timestamp: CMTime) -> [TSPacket] {
var PCR: UInt64?
let duration: Double = timestamp.seconds - PCRTimestamp.seconds
if PCRPID == PID && 0.02 <= duration {
@ -167,11 +118,114 @@ public class TSWriter {
}
return packets
}
}
extension TSWriter: AudioEncoderDelegate {
// MARK: AudioEncoderDelegate
func didSetFormatDescription(audio formatDescription: CMFormatDescription?) {
guard let formatDescription: CMAudioFormatDescription = formatDescription else {
return
}
var data: ElementaryStreamSpecificData = ElementaryStreamSpecificData()
data.streamType = ElementaryStreamType.adtsaac.rawValue
data.elementaryPID = TSWriter.defaultAudioPID
PMT.elementaryStreamSpecificData.append(data)
continuityCounters[TSWriter.defaultAudioPID] = 0
audioConfig = AudioSpecificConfig(formatDescription: formatDescription)
}
func sampleOutput(audio bytes: UnsafePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime) {
writeSampleBuffer(
TSWriter.defaultAudioPID,
streamID: 192,
bytes: bytes,
count: count,
presentationTimeStamp: presentationTimeStamp,
decodeTimeStamp: CMTime.invalid,
randomAccessIndicator: false
)
}
}
extension TSWriter: VideoEncoderDelegate {
// MARK: VideoEncoderDelegate
func didSetFormatDescription(video formatDescription: CMFormatDescription?) {
guard
let formatDescription: CMFormatDescription = formatDescription,
let avcC: Data = AVCConfigurationRecord.getData(formatDescription) else {
return
}
var data: ElementaryStreamSpecificData = ElementaryStreamSpecificData()
data.streamType = ElementaryStreamType.h264.rawValue
data.elementaryPID = TSWriter.defaultVideoPID
PMT.elementaryStreamSpecificData.append(data)
continuityCounters[TSWriter.defaultVideoPID] = 0
videoConfig = AVCConfigurationRecord(data: avcC)
}
func sampleOutput(video sampleBuffer: CMSampleBuffer) {
guard let dataBuffer = sampleBuffer.dataBuffer else {
return
}
var length: Int = 0
var buffer: UnsafeMutablePointer<Int8>?
guard CMBlockBufferGetDataPointer(dataBuffer, atOffset: 0, lengthAtOffsetOut: nil, totalLengthOut: &length, dataPointerOut: &buffer) == noErr else {
return
}
guard let bytes = buffer else {
return
}
writeSampleBuffer(
TSWriter.defaultVideoPID,
streamID: 224,
bytes: UnsafeRawPointer(bytes).bindMemory(to: UInt8.self, capacity: length),
count: UInt32(length),
presentationTimeStamp: sampleBuffer.presentationTimeStamp,
decodeTimeStamp: sampleBuffer.decodeTimeStamp,
randomAccessIndicator: !sampleBuffer.dependsOnOthers
)
}
}
class TSFileWriter: TSWriter {
static let defaultSegmentCount: Int = 3
static let defaultSegmentMaxCount: Int = 12
static let defaultSegmentDuration: Double = 2
var segmentMaxCount: Int = TSFileWriter.defaultSegmentMaxCount
var segmentDuration: Double = TSFileWriter.defaultSegmentDuration
private(set) var files: [M3UMediaInfo] = []
private var currentFileHandle: FileHandle?
private var currentFileURL: URL?
private var sequence: Int = 0
private var rotatedTimestamp: CMTime = CMTime.zero
var playlist: String {
var m3u8: M3U = M3U()
m3u8.targetDuration = segmentDuration
if sequence <= TSFileWriter.defaultSegmentMaxCount {
m3u8.mediaSequence = 0
m3u8.mediaList = files
for mediaItem in m3u8.mediaList where mediaItem.duration > m3u8.targetDuration {
m3u8.targetDuration = mediaItem.duration + 1
}
return m3u8.description
}
let startIndex = max(0, files.count - TSFileWriter.defaultSegmentCount)
m3u8.mediaSequence = sequence - TSFileWriter.defaultSegmentMaxCount
m3u8.mediaList = Array(files[startIndex..<files.count])
for mediaItem in m3u8.mediaList where mediaItem.duration > m3u8.targetDuration {
m3u8.targetDuration = mediaItem.duration + 1
}
return m3u8.description
}
override func rotateFileHandle(_ timestamp: CMTime) {
super.rotateFileHandle(timestamp)
func rotateFileHandle(_ timestamp: CMTime) -> Bool {
let duration: Double = timestamp.seconds - rotatedTimestamp.seconds
if duration <= segmentDuration {
return false
return
}
let fileManager: FileManager = FileManager.default
@ -200,7 +254,7 @@ public class TSWriter {
}
fileManager.createFile(atPath: url.path, contents: nil, attributes: nil)
if TSWriter.defaultSegmentMaxCount <= files.count {
if TSFileWriter.defaultSegmentMaxCount <= files.count {
let info: M3UMediaInfo = files.removeFirst()
do {
try fileManager.removeItem(at: info.url as URL)
@ -222,27 +276,42 @@ public class TSWriter {
currentFileHandle?.closeFile()
currentFileHandle = try? FileHandle(forWritingTo: url)
PMT.PCRPID = PCRPID
var bytes: Data = Data()
var packets: [TSPacket] = []
packets.append(contentsOf: PAT.arrayOfPackets(TSWriter.defaultPATPID))
packets.append(contentsOf: PMT.arrayOfPackets(TSWriter.defaultPMTPID))
for packet in packets {
bytes.append(packet.data)
}
writeProgram()
rotatedTimestamp = timestamp
}
override func write(_ data: Data) {
nstry({
self.delegate?.didOutput(bytes)
self.currentFileHandle?.write(bytes)
self.currentFileHandle?.write(data)
}, { exception in
logger.warn("\(exception)")
})
rotatedTimestamp = timestamp
return true
super.write(data)
}
func removeFiles() {
override func startRunning() {
guard isRunning else {
return
}
isRunning = true
}
override func stopRunning() {
guard !isRunning else {
return
}
currentFileURL = nil
currentFileHandle = nil
removeFiles()
isRunning = false
}
func getFilePath(_ fileName: String) -> String? {
return files.first { $0.url.absoluteString.contains(fileName) }?.url.path
}
private func removeFiles() {
let fileManager: FileManager = FileManager.default
for info in files {
do {
@ -254,67 +323,3 @@ public class TSWriter {
files.removeAll()
}
}
extension TSWriter: Running {
// MARK: Running
public func startRunning() {
lockQueue.async {
guard self.isRunning else {
return
}
self.isRunning = true
}
}
public func stopRunning() {
lockQueue.async {
guard !self.isRunning else {
return
}
self.currentFileURL = nil
self.currentFileHandle = nil
self.removeFiles()
self.isRunning = false
}
}
}
extension TSWriter: AudioEncoderDelegate {
// MARK: AudioEncoderDelegate
func didSetFormatDescription(audio formatDescription: CMFormatDescription?) {
guard let formatDescription: CMAudioFormatDescription = formatDescription else {
return
}
audioConfig = AudioSpecificConfig(formatDescription: formatDescription)
var data: ElementaryStreamSpecificData = ElementaryStreamSpecificData()
data.streamType = ElementaryStreamType.adtsaac.rawValue
data.elementaryPID = TSWriter.defaultAudioPID
PMT.elementaryStreamSpecificData.append(data)
continuityCounters[TSWriter.defaultAudioPID] = 0
}
func sampleOutput(audio bytes: UnsafeMutablePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime) {
writeSampleBuffer(TSWriter.defaultAudioPID, streamID: 192, bytes: bytes, count: count, presentationTimeStamp: presentationTimeStamp)
}
}
extension TSWriter: VideoEncoderDelegate {
// MARK: VideoEncoderDelegate
func didSetFormatDescription(video formatDescription: CMFormatDescription?) {
guard
let formatDescription: CMFormatDescription = formatDescription,
let avcC: Data = AVCConfigurationRecord.getData(formatDescription) else {
return
}
videoConfig = AVCConfigurationRecord(data: avcC)
var data: ElementaryStreamSpecificData = ElementaryStreamSpecificData()
data.streamType = ElementaryStreamType.h264.rawValue
data.elementaryPID = TSWriter.defaultVideoPID
PMT.elementaryStreamSpecificData.append(data)
continuityCounters[TSWriter.defaultVideoPID] = 0
}
func sampleOutput(video sampleBuffer: CMSampleBuffer) {
writeSampleBuffer(TSWriter.defaultVideoPID, streamID: 224, sampleBuffer: sampleBuffer)
}
}

View File

@ -33,7 +33,7 @@ extension RTMPMuxer: AudioEncoderDelegate {
delegate?.sampleOutput(audio: buffer, withTimestamp: 0, muxer: self)
}
func sampleOutput(audio bytes: UnsafeMutablePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime) {
func sampleOutput(audio bytes: UnsafePointer<UInt8>?, count: UInt32, presentationTimeStamp: CMTime) {
let delta: Double = (audioTimestamp == CMTime.zero ? 0 : presentationTimeStamp.seconds - audioTimestamp.seconds) * 1000
guard let bytes = bytes, 0 <= delta else {
return