In [1]:
// imports
import de.l3s.archivespark._
import de.l3s.archivespark.implicits._
import de.l3s.archivespark.enrich._
import de.l3s.archivespark.enrich.functions._
import de.l3s.archivespark.specific.warc.implicits._
import de.l3s.archivespark.specific.warc._
import de.l3s.archivespark.specific.warc.specs._

In [2]:
// load CDX and WARC (lazily) from local filesystem
val rdd = ArchiveSpark.load(sc, WarcCdxHdfsSpec("/cdx/*.cdx", "/warc"))

In [3]:
// count records
rdd.count

3

In [4]:
// fetch first record as a string (only CDX headers)
rdd.first.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20140103030321",
    "digest":"B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
    "originalUrl":"http://example.com?example=1",
    "surtUrl":"com,example)/?example=1",
    "mime":"text/html",
    "compressedSize":1043,
    "meta":"-",
    "status":200
  }
}

In [5]:
// retrieve corresponding WARC records
rdd.enrich(WarcPayload).first.toJsonString

{
  "httpStatusLine":"HTTP/1.1 200 OK",
  "recordHeader":{
    "WARC-Target-URI":"http://example.com?example=1",
    "WARC-Date":"2014-01-03T03:03:21Z",
    "WARC-Type":"response",
    "Content-Length":"1610",
    "WARC-Payload-Digest":"sha1:B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
    "Content-Type":"application/http; msgtype=response",
    "absolute-offset":"0",
    "WARC-Record-ID":"<urn:uuid:6d058047-ede2-4a13-be79-90c17c631dd4>",
    "reader-identifier":"example.warc.gz",
    "WARC-Warcinfo-ID":"<urn:uuid:fbd6cf0a-6160-4550-b343-12188dc05234>"
  },
  "payload":"bytes(length: 1270)",
  "httpHeader":{
    "Last-Modified":"Fri, 09 Aug 2013 23:54:35 GMT",
    "X-Cache":"HIT",
    "Server":"ECS (sjc/4FCE)",
    "Accept-Ranges":"bytes",
    "Etag":"\"359670651\"",
    "Expires":...

In [6]:
// get html records with status code = 200
val htmlOnline = rdd.filter(r => r.status == 200 && r.mime == "text/html")
htmlOnline.count

1

In [7]:
// attach WARC payload as plain text
htmlOnline.enrich(StringContent).first.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20140103030321",
    "digest":"B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
    "originalUrl":"http://example.com?example=1",
    "surtUrl":"com,example)/?example=1",
    "mime":"text/html",
    "compressedSize":1043,
    "meta":"-",
    "status":200
  },
  "payload":{
    "string":"<!doctype html>\n<html>\n<head>\n    <title>Example Domain</title>\n\n    <meta charset=\"utf-8\" />\n    <meta http-equiv=\"Content-type\" content=\"text/html; charset=utf-8\" />\n    <meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />\n    <style type=\"text/css\">\n    body {\n        background-color: #f0f0f2;\n        margin: 0;\n        padding: 0;\n        font-family: \"Open Sans\", \"Helvetica Neue\", Helvetica, Aria...

In [8]:
// compute plain text's length
htmlOnline.mapEnrich(StringContent, "length") { content => content.length }.first.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20140103030321",
    "digest":"B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
    "originalUrl":"http://example.com?example=1",
    "surtUrl":"com,example)/?example=1",
    "mime":"text/html",
    "compressedSize":1043,
    "meta":"-",
    "status":200
  },
  "payload":{
    "string":{
      "length":1269
    }
  }
}

In [9]:
// attach all the div elements from the plain text
htmlOnline.enrich(Html("div").on(StringContent)).first.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20140103030321",
    "digest":"B2LTWWPUOYAH7UIPQ7ZUPQ4VMBSVC36A",
    "originalUrl":"http://example.com?example=1",
    "surtUrl":"com,example)/?example=1",
    "mime":"text/html",
    "compressedSize":1043,
    "meta":"-",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "div":[
          "<div> \n <h1>Example Domain</h1> \n <p>This domain is established to be used for illustrative examples in documents. You may use this domain in examples without prior coordination or asking for permission.</p> \n <p><a href=\"http://www.iana.org/domains/example\">More information...</a></p> \n</div>"
        ]
      }
    }
  }
}

In [9]:
// map/reduce - count and sort all the domains
val domainRegex = """^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/\n]+)""".r

rdd.map(r => (domainRegex.findFirstMatchIn(r.surtUrl).map(_ group 1), 1)).reduceByKey(_ + _).map(_.swap).sortByKey(false).collect

Array((2,Some(com,example))), (1,Some(org,iana))))

In [10]:
// load WARC records for l3s.de domain using Internet Archive Wayback Machine
val rdd2 = ArchiveSpark.load(sc, WaybackSpec("l3s.de", matchPrefix = true))
rdd2.count

27151

In [11]:
// retrieve unique online html records
val uniquePages = rdd2.filter(r => r.status == 200 && r.mime == "text/html").distinctValue(_.surtUrl) { (a, b) => a }
// setup title function = text of html title
val title = HtmlText.of(Html.first("title"))
// map title function and retrieve first 10 values
uniquePages.mapValues(title).take(10).foreach(println)

L3S Forschungszentrum | ProPerBoundsNGI - project view
javax.swing.plaf.basic Class Hierarchy (Edutella (Service Extensions) API)
L3S Forschungszentrum | Projekte
Aktuelles - Inhalt
L3S Forschungszentrum | Publikationen
XSLTErrorResources_it (Edutella (Service Extensions) API)
RendezvousEvent (Project JXTA (TM) Technology 0.1 API Documentation)
Aktuelles - Inhalt
L3S Forschungszentrum | Anmeldung
org.omg.PortableServer.POAPackage (Edutella (Service Extensions) API)


In [12]:
// attach title text to unique pages
val withTitle = uniquePages.enrich(title)
// filter out short titles
val interestingTitlePages = withTitle.filterValue(title) {t =>
    t.isDefined && t.get.split(" ").size > 5
}
// map title function and retrieve first 10 values
interestingTitlePages.mapValues(title).take(10).foreach(println)

L3S Forschungszentrum | ProPerBoundsNGI - project view
javax.swing.plaf.basic Class Hierarchy (Edutella (Service Extensions) API)
RendezvousEvent (Project JXTA (TM) Technology 0.1 API Documentation)
ID (Project JXTA (TM) Technology 0.1 API Documentation)
Übung "Bilderzeugung mit einem Laser", Aufgabenstellung und Dokumentation
L3S Forschungszentrum | Willkommen auf der ECF 2014 Artikel
L3S Research Center - European “digital libraries” Award for L3S researchers
L3S Forschungszentrum | Dr. Mohammad Alrifai
Forschungszentrum L3S - HELCA: Hannover eLearning Campus
L3S Research Center | Co-organizing the TEFA 2013 Workshop at EC-TELarticle


In [13]:
// extract entities (LDA)
val titleEntities = Entities.of(title)
// take first 10 records, filter out records with empty entities
val records = sc.parallelize(interestingTitlePages.enrich(titleEntities).take(10)).filterNonEmpty(titleEntities)
// print records
records.collect.foreach(r => println(r.toJsonString))

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20060904153618",
    "digest":"WSZAXBRAUG66KYIYXT5UGVKE2KJTGII2",
    "originalUrl":"http://www.l3s.de:80/~brunkhor/javadoc/j2sdk1.4.2/javax/swing/plaf/basic/package-tree.html",
    "surtUrl":"de,l3s)/~brunkhor/javadoc/j2sdk1.4.2/javax/swing/plaf/basic/package-tree.html",
    "mime":"text/html",
    "compressedSize":6212,
    "meta":"-",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "title":{
          "text":{
            "_":"javax.swing.plaf.basic Class Hierarchy (Edutella (Service Extensions) API)",
            "entities":{
              "persons":[
                "Edutella"
              ],
              "organizations":[
                
              ],
              "locations":[
                
              ],
              "dates":[
                
              ]
            }
          }
        }
      }
    }
  }
}
{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20070401085209",


In [14]:
// save records as json
records.saveAsJson("entities")